零度AI
高级22 分钟阅读

Claude Code 消息队列

使用 Claude Code 实现 RabbitMQ、Kafka 消息队列,处理异步任务和解耦服务

Claude Code消息队列RabbitMQKafka异步

消息队列概述

消息队列用于异步处理、服务解耦和流量削峰。Claude Code 可以帮你实现 RabbitMQ 和 Kafka 的集成。

队列适用场景

发送邮件、发送通知、数据处理、任务调度、服务解耦。

RabbitMQ 实现

Docker 部署

bash
帮我创建一个 RabbitMQ 的 Docker 配置。
yaml
# docker-compose.message.yml
version: "3.9"

services:
  rabbitmq:
    image: rabbitmq:3.12-management-alpine
    container_name: rabbitmq
    ports:
      - "5672:5672"   # AMQP 协议端口
      - "15672:15672" # 管理界面端口
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
      RABBITMQ_DEFAULT_VHOST: /
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "-q", "ping"]
      interval: 30s
      timeout: 10s
      retries: 5

volumes:
  rabbitmq_data:

生产者实现

javascript
// lib/queue/rabbitmq/producer.js
import amqp from "amqplib";

const RABBITMQ_URL = process.env.RABBITMQ_URL || "amqp://admin:admin@localhost:5672";

let connection = null;
let channel = null;

// 交换机配置
const EXCHANGES = {
  USER: "user.events",
  ORDER: "order.events",
  NOTIFICATION: "notification.events",
};

// 路由键
const ROUTING_KEYS = {
  USER_CREATED: "user.created",
  USER_UPDATED: "user.updated",
  ORDER_CREATED: "order.created",
  ORDER_PAID: "order.paid",
  EMAIL_SEND: "email.send",
  SMS_SEND: "sms.send",
};

// 连接
export async function connect() {
  if (connection && channel) return { connection, channel };

  connection = await amqp.connect(RABBITMQ_URL);
  channel = await connection.createChannel();

  // 设置交换机
  await channel.assertExchange(EXCHANGES.USER, "topic", { durable: true });
  await channel.assertExchange(EXCHANGES.ORDER, "topic", { durable: true });
  await channel.assertExchange(EXCHANGES.NOTIFICATION, "topic", { durable: true });

  console.log("RabbitMQ connected");

  connection.on("close", () => {
    console.log("RabbitMQ connection closed");
    connection = null;
    channel = null;
  });

  return { connection, channel };
}

// 发送消息
export async function publish(exchange, routingKey, message) {
  const { channel } = await connect();

  const messageBuffer = Buffer.from(JSON.stringify({
    payload: message,
    timestamp: Date.now(),
    messageId: generateMessageId(),
  }));

  channel.publish(exchange, routingKey, messageBuffer, {
    persistent: true,
    contentType: "application/json",
    headers: {
      "x-retry-count": 0,
    },
  });

  console.log(`Message sent to ${exchange} with key ${routingKey}`);
}

// 用户事件
export async function publishUserEvent(routingKey, userData) {
  await publish(EXCHANGES.USER, routingKey, userData);
}

// 订单事件
export async function publishOrderEvent(routingKey, orderData) {
  await publish(EXCHANGES.ORDER, routingKey, orderData);
}

// 通知事件
export async function publishNotification(routingKey, notificationData) {
  await publish(EXCHANGES.NOTIFICATION, routingKey, notificationData);
}

function generateMessageId() {
  return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}

消费者实现

javascript
// lib/queue/rabbitmq/consumer.js
import amqp from "amqplib";
import { connect } from "./producer.js";

const QUEUES = {
  EMAIL: "email.queue",
  SMS: "sms.queue",
  USER_INDEX: "user.index.queue",
  ORDER_NOTIFY: "order.notify.queue",
};

let connection = null;
let channel = null;

// 初始化队列
export async function initQueues() {
  const { channel: ch } = await connect();
  channel = ch;

  // 创建队列并绑定交换机
  await channel.assertQueue(QUEUES.EMAIL, { durable: true });
  await channel.assertQueue(QUEUES.SMS, { durable: true });
  await channel.assertQueue(QUEUES.USER_INDEX, { durable: true });
  await channel.assertQueue(QUEUES.ORDER_NOTIFY, { durable: true });

  // 绑定队列到交换机
  await channel.bindQueue(QUEUES.EMAIL, EXCHANGES.NOTIFICATION, ROUTING_KEYS.EMAIL_SEND);
  await channel.bindQueue(QUEUES.SMS, EXCHANGES.NOTIFICATION, ROUTING_KEYS.SMS_SEND);
  await channel.bindQueue(QUEUES.USER_INDEX, EXCHANGES.USER, "user.*");
  await channel.bindQueue(QUEUES.ORDER_NOTIFY, EXCHANGES.ORDER, "order.*");

  console.log("Queues initialized");
}

// 消费消息
export async function consume(queue, handler) {
  const { channel: ch } = await connect();
  channel = ch;

  await channel.prefetch(1); // 每次只处理一条消息

  channel.consume(queue, async (msg) => {
    if (!msg) return;

    const content = JSON.parse(msg.content.toString());
    const retryCount = msg.properties.headers?.["x-retry-count"] || 0;

    try {
      await handler(content.payload);
      channel.ack(msg);
      console.log(`Message processed from ${queue}`);
    } catch (error) {
      console.error(`Error processing message:`, error);

      // 失败重试(最多 3 次)
      if (retryCount < 3) {
        // 延迟重试
        await new Promise((resolve) => setTimeout(resolve, Math.pow(2, retryCount) * 1000));

        // 重新发布消息,增加重试计数
        channel.publish(
          msg.fields.exchange,
          msg.fields.routingKey,
          msg.content,
          {
            ...msg.properties,
            headers: {
              ...msg.properties.headers,
              "x-retry-count": retryCount + 1,
            },
          }
        );
        channel.ack(msg);
      } else {
        // 超过重试次数,进入死信队列
        console.error(`Message failed after ${retryCount} retries, moving to DLQ`);
        channel.nack(msg, false, false);
      }
    }
  });
}

// 邮件处理
export async function startEmailConsumer() {
  await consume(QUEUES.EMAIL, async (payload) => {
    const { to, subject, body } = payload;
    console.log(`Sending email to ${to}: ${subject}`);
    // await sendEmail(to, subject, body);
  });
}

// 用户索引处理
export async function startUserIndexConsumer() {
  await consume(QUEUES.USER_INDEX, async (payload) => {
    console.log(`Indexing user: ${payload.id}`);
    // await searchService.indexUser(payload);
  });
}

Kafka 实现

Docker 部署

yaml
# docker-compose.kafka.yml
version: "3.9"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

Kafka 客户端

javascript
// lib/queue/kafka/producer.js
import { Kafka, Partitioners } from "kafkajs";

const kafka = new Kafka({
  clientId: "myapp",
  brokers: process.env.KAFKA_BROKERS?.split(",") || ["localhost:9092"],
});

const producer = kafka.producer({
  createPartitioner: Partitioners.LegacyPartitioner,
});

export async function connectProducer() {
  await producer.connect();
  console.log("Kafka producer connected");
}

export async function sendMessage(topic, messages) {
  await connectProducer();

  await producer.send({
    topic,
    messages: messages.map((msg) => ({
      key: msg.key,
      value: JSON.stringify({
        payload: msg.value,
        timestamp: Date.now(),
        messageId: generateMessageId(),
      }),
      headers: msg.headers || {},
    })),
  });
}

export async function sendUserEvent(eventType, userData) {
  await sendMessage("user-events", {
    key: userData.id,
    value: {
      eventType,
      data: userData,
    },
  });
}

export async function sendOrderEvent(eventType, orderData) {
  await sendMessage("order-events", {
    key: orderData.id,
    value: {
      eventType,
      data: orderData,
    },
  });
}

function generateMessageId() {
  return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}

Kafka 消费者

javascript
// lib/queue/kafka/consumer.js
import { Kafka } from "kafkajs";

const kafka = new Kafka({
  clientId: "myapp-consumer",
  brokers: process.env.KAFKA_BROKERS?.split(",") || ["localhost:9092"],
});

const consumer = kafka.consumer({ groupId: "myapp-group" });

export async function startConsumer(topics, handlers) {
  await consumer.connect();
  console.log("Kafka consumer connected");

  await consumer.subscribe({ topics, fromBeginning: false });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const { payload, messageId } = JSON.parse(message.value.toString());
      const handler = handlers[topic];

      if (handler) {
        try {
          await handler(payload, {
            topic,
            partition,
            offset: message.offset,
            timestamp: message.timestamp,
          });
          console.log(`Message ${messageId} processed from ${topic}`);
        } catch (error) {
          console.error(`Error processing message ${messageId}:`, error);
          // 可以在这里实现重试逻辑
        }
      }
    },
  });
}

// 使用示例
const handlers = {
  "user-events": async (payload) => {
    console.log(`Processing user event: ${payload.eventType}`, payload.data);
    // await userService.processEvent(payload.eventType, payload.data);
  },
  "order-events": async (payload) => {
    console.log(`Processing order event: ${payload.eventType}`, payload.data);
    // await orderService.processEvent(payload.eventType, payload.data);
  },
};

任务队列集成

Bull 队列(Redis)

javascript
// lib/queue/bull.js
import Bull from "bull";

const emailQueue = new Bull("email", {
  redis: {
    host: process.env.REDIS_HOST || "localhost",
    port: process.env.REDIS_PORT || 6379,
    password: process.env.REDIS_PASSWORD,
  },
});

const imageProcessQueue = new Bull("image-process", {
  redis: {
    host: process.env.REDIS_HOST || "localhost",
    port: process.env.REDIS_PORT || 6379,
  },
});

// 邮件任务
emailQueue.process(async (job) => {
  const { to, subject, body, attachments } = job.data;
  console.log(`Processing email job ${job.id}: to=${to}`);

  // 实际发送邮件
  // await sendEmail({ to, subject, body, attachments });

  return { sent: true, to };
});

// 图片处理任务
imageProcessQueue.process(async (job) => {
  const { imageId, operations } = job.data;
  console.log(`Processing image ${imageId}:`, operations);

  // 图片处理逻辑
  // await processImage(imageId, operations);

  return { processed: true, imageId };
});

// 添加任务
export async function addEmailJob(emailData, options = {}) {
  return emailQueue.add(emailData, {
    attempts: 3,
    backoff: {
      type: "exponential",
      delay: 1000,
    },
    ...options,
  });
}

export async function addImageProcessJob(imageData, options = {}) {
  return imageProcessQueue.add(imageData, {
    priority: imageData.priority || 0,
    attempts: 2,
    timeout: 30000,
    ...options,
  });
}

// 事件监听
emailQueue.on("completed", (job, result) => {
  console.log(`Email job ${job.id} completed:`, result);
});

emailQueue.on("failed", (job, error) => {
  console.error(`Email job ${job.id} failed:`, error.message);
});

Express 集成

路由中使用队列

javascript
// routes/user.js
import express from "express";
import { publishUserEvent } from "../lib/queue/rabbitmq/producer.js";
import { addEmailJob } from "../lib/queue/bull.js";

const router = express.Router();

// 创建用户
router.post("/users", async (req, res) => {
  try {
    const user = await UserService.create(req.body);

    // 发布用户创建事件(异步)
    await publishUserEvent("user.created", {
      id: user.id,
      email: user.email,
      username: user.username,
    });

    // 发送欢迎邮件(异步任务)
    await addEmailJob({
      to: user.email,
      subject: "欢迎注册",
      body: `您好 ${user.username},欢迎注册!`,
    });

    res.json({ success: true, data: user });
  } catch (error) {
    res.status(500).json({ error: error.message });
  }
});

export default router;

常见模式

1. 发布/订阅模式

javascript
// RabbitMQ 发布/订阅
// 多个消费者可以订阅同一个交换机,收到相同的消息
await channel.assertExchange("notifications", "fanout", { durable: true });

2. 路由模式

javascript
// 根据路由键精确匹配
// user.created -> 发送到用户创建处理器
// user.updated -> 发送到用户更新处理器
await channel.assertExchange("user", "topic", { durable: true });
await channel.bindQueue(userQueue, "user", "user.created");

3. 工作队列模式

javascript
// 多个消费者共享一个队列,消息在消费者间轮询分发
await channel.assertQueue("tasks", { durable: true });
// 多个 worker 同时消费

消息队列注意事项

  • 消息处理要保证幂等性
  • 设置合理的重试机制
  • 监控队列堆积情况
  • 考虑消息顺序性需求

总结

使用 Claude Code 实现消息队列:

  • RabbitMQ 适合中小型应用
  • Kafka 适合高吞吐量场景
  • Bull 适合简单的任务队列
  • 合理设计消息格式和路由
  • 实现重试和错误处理
  • 监控队列健康状态