消息队列概述
消息队列用于异步处理、服务解耦和流量削峰。Claude Code 可以帮你实现 RabbitMQ 和 Kafka 的集成。
队列适用场景
发送邮件、发送通知、数据处理、任务调度、服务解耦。
RabbitMQ 实现
Docker 部署
bash
帮我创建一个 RabbitMQ 的 Docker 配置。yaml
# docker-compose.message.yml
version: "3.9"
services:
rabbitmq:
image: rabbitmq:3.12-management-alpine
container_name: rabbitmq
ports:
- "5672:5672" # AMQP 协议端口
- "15672:15672" # 管理界面端口
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
RABBITMQ_DEFAULT_VHOST: /
volumes:
- rabbitmq_data:/var/lib/rabbitmq
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "-q", "ping"]
interval: 30s
timeout: 10s
retries: 5
volumes:
rabbitmq_data:生产者实现
javascript
// lib/queue/rabbitmq/producer.js
import amqp from "amqplib";
const RABBITMQ_URL = process.env.RABBITMQ_URL || "amqp://admin:admin@localhost:5672";
let connection = null;
let channel = null;
// 交换机配置
const EXCHANGES = {
USER: "user.events",
ORDER: "order.events",
NOTIFICATION: "notification.events",
};
// 路由键
const ROUTING_KEYS = {
USER_CREATED: "user.created",
USER_UPDATED: "user.updated",
ORDER_CREATED: "order.created",
ORDER_PAID: "order.paid",
EMAIL_SEND: "email.send",
SMS_SEND: "sms.send",
};
// 连接
export async function connect() {
if (connection && channel) return { connection, channel };
connection = await amqp.connect(RABBITMQ_URL);
channel = await connection.createChannel();
// 设置交换机
await channel.assertExchange(EXCHANGES.USER, "topic", { durable: true });
await channel.assertExchange(EXCHANGES.ORDER, "topic", { durable: true });
await channel.assertExchange(EXCHANGES.NOTIFICATION, "topic", { durable: true });
console.log("RabbitMQ connected");
connection.on("close", () => {
console.log("RabbitMQ connection closed");
connection = null;
channel = null;
});
return { connection, channel };
}
// 发送消息
export async function publish(exchange, routingKey, message) {
const { channel } = await connect();
const messageBuffer = Buffer.from(JSON.stringify({
payload: message,
timestamp: Date.now(),
messageId: generateMessageId(),
}));
channel.publish(exchange, routingKey, messageBuffer, {
persistent: true,
contentType: "application/json",
headers: {
"x-retry-count": 0,
},
});
console.log(`Message sent to ${exchange} with key ${routingKey}`);
}
// 用户事件
export async function publishUserEvent(routingKey, userData) {
await publish(EXCHANGES.USER, routingKey, userData);
}
// 订单事件
export async function publishOrderEvent(routingKey, orderData) {
await publish(EXCHANGES.ORDER, routingKey, orderData);
}
// 通知事件
export async function publishNotification(routingKey, notificationData) {
await publish(EXCHANGES.NOTIFICATION, routingKey, notificationData);
}
function generateMessageId() {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}消费者实现
javascript
// lib/queue/rabbitmq/consumer.js
import amqp from "amqplib";
import { connect } from "./producer.js";
const QUEUES = {
EMAIL: "email.queue",
SMS: "sms.queue",
USER_INDEX: "user.index.queue",
ORDER_NOTIFY: "order.notify.queue",
};
let connection = null;
let channel = null;
// 初始化队列
export async function initQueues() {
const { channel: ch } = await connect();
channel = ch;
// 创建队列并绑定交换机
await channel.assertQueue(QUEUES.EMAIL, { durable: true });
await channel.assertQueue(QUEUES.SMS, { durable: true });
await channel.assertQueue(QUEUES.USER_INDEX, { durable: true });
await channel.assertQueue(QUEUES.ORDER_NOTIFY, { durable: true });
// 绑定队列到交换机
await channel.bindQueue(QUEUES.EMAIL, EXCHANGES.NOTIFICATION, ROUTING_KEYS.EMAIL_SEND);
await channel.bindQueue(QUEUES.SMS, EXCHANGES.NOTIFICATION, ROUTING_KEYS.SMS_SEND);
await channel.bindQueue(QUEUES.USER_INDEX, EXCHANGES.USER, "user.*");
await channel.bindQueue(QUEUES.ORDER_NOTIFY, EXCHANGES.ORDER, "order.*");
console.log("Queues initialized");
}
// 消费消息
export async function consume(queue, handler) {
const { channel: ch } = await connect();
channel = ch;
await channel.prefetch(1); // 每次只处理一条消息
channel.consume(queue, async (msg) => {
if (!msg) return;
const content = JSON.parse(msg.content.toString());
const retryCount = msg.properties.headers?.["x-retry-count"] || 0;
try {
await handler(content.payload);
channel.ack(msg);
console.log(`Message processed from ${queue}`);
} catch (error) {
console.error(`Error processing message:`, error);
// 失败重试(最多 3 次)
if (retryCount < 3) {
// 延迟重试
await new Promise((resolve) => setTimeout(resolve, Math.pow(2, retryCount) * 1000));
// 重新发布消息,增加重试计数
channel.publish(
msg.fields.exchange,
msg.fields.routingKey,
msg.content,
{
...msg.properties,
headers: {
...msg.properties.headers,
"x-retry-count": retryCount + 1,
},
}
);
channel.ack(msg);
} else {
// 超过重试次数,进入死信队列
console.error(`Message failed after ${retryCount} retries, moving to DLQ`);
channel.nack(msg, false, false);
}
}
});
}
// 邮件处理
export async function startEmailConsumer() {
await consume(QUEUES.EMAIL, async (payload) => {
const { to, subject, body } = payload;
console.log(`Sending email to ${to}: ${subject}`);
// await sendEmail(to, subject, body);
});
}
// 用户索引处理
export async function startUserIndexConsumer() {
await consume(QUEUES.USER_INDEX, async (payload) => {
console.log(`Indexing user: ${payload.id}`);
// await searchService.indexUser(payload);
});
}Kafka 实现
Docker 部署
yaml
# docker-compose.kafka.yml
version: "3.9"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1Kafka 客户端
javascript
// lib/queue/kafka/producer.js
import { Kafka, Partitioners } from "kafkajs";
const kafka = new Kafka({
clientId: "myapp",
brokers: process.env.KAFKA_BROKERS?.split(",") || ["localhost:9092"],
});
const producer = kafka.producer({
createPartitioner: Partitioners.LegacyPartitioner,
});
export async function connectProducer() {
await producer.connect();
console.log("Kafka producer connected");
}
export async function sendMessage(topic, messages) {
await connectProducer();
await producer.send({
topic,
messages: messages.map((msg) => ({
key: msg.key,
value: JSON.stringify({
payload: msg.value,
timestamp: Date.now(),
messageId: generateMessageId(),
}),
headers: msg.headers || {},
})),
});
}
export async function sendUserEvent(eventType, userData) {
await sendMessage("user-events", {
key: userData.id,
value: {
eventType,
data: userData,
},
});
}
export async function sendOrderEvent(eventType, orderData) {
await sendMessage("order-events", {
key: orderData.id,
value: {
eventType,
data: orderData,
},
});
}
function generateMessageId() {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}Kafka 消费者
javascript
// lib/queue/kafka/consumer.js
import { Kafka } from "kafkajs";
const kafka = new Kafka({
clientId: "myapp-consumer",
brokers: process.env.KAFKA_BROKERS?.split(",") || ["localhost:9092"],
});
const consumer = kafka.consumer({ groupId: "myapp-group" });
export async function startConsumer(topics, handlers) {
await consumer.connect();
console.log("Kafka consumer connected");
await consumer.subscribe({ topics, fromBeginning: false });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const { payload, messageId } = JSON.parse(message.value.toString());
const handler = handlers[topic];
if (handler) {
try {
await handler(payload, {
topic,
partition,
offset: message.offset,
timestamp: message.timestamp,
});
console.log(`Message ${messageId} processed from ${topic}`);
} catch (error) {
console.error(`Error processing message ${messageId}:`, error);
// 可以在这里实现重试逻辑
}
}
},
});
}
// 使用示例
const handlers = {
"user-events": async (payload) => {
console.log(`Processing user event: ${payload.eventType}`, payload.data);
// await userService.processEvent(payload.eventType, payload.data);
},
"order-events": async (payload) => {
console.log(`Processing order event: ${payload.eventType}`, payload.data);
// await orderService.processEvent(payload.eventType, payload.data);
},
};任务队列集成
Bull 队列(Redis)
javascript
// lib/queue/bull.js
import Bull from "bull";
const emailQueue = new Bull("email", {
redis: {
host: process.env.REDIS_HOST || "localhost",
port: process.env.REDIS_PORT || 6379,
password: process.env.REDIS_PASSWORD,
},
});
const imageProcessQueue = new Bull("image-process", {
redis: {
host: process.env.REDIS_HOST || "localhost",
port: process.env.REDIS_PORT || 6379,
},
});
// 邮件任务
emailQueue.process(async (job) => {
const { to, subject, body, attachments } = job.data;
console.log(`Processing email job ${job.id}: to=${to}`);
// 实际发送邮件
// await sendEmail({ to, subject, body, attachments });
return { sent: true, to };
});
// 图片处理任务
imageProcessQueue.process(async (job) => {
const { imageId, operations } = job.data;
console.log(`Processing image ${imageId}:`, operations);
// 图片处理逻辑
// await processImage(imageId, operations);
return { processed: true, imageId };
});
// 添加任务
export async function addEmailJob(emailData, options = {}) {
return emailQueue.add(emailData, {
attempts: 3,
backoff: {
type: "exponential",
delay: 1000,
},
...options,
});
}
export async function addImageProcessJob(imageData, options = {}) {
return imageProcessQueue.add(imageData, {
priority: imageData.priority || 0,
attempts: 2,
timeout: 30000,
...options,
});
}
// 事件监听
emailQueue.on("completed", (job, result) => {
console.log(`Email job ${job.id} completed:`, result);
});
emailQueue.on("failed", (job, error) => {
console.error(`Email job ${job.id} failed:`, error.message);
});Express 集成
路由中使用队列
javascript
// routes/user.js
import express from "express";
import { publishUserEvent } from "../lib/queue/rabbitmq/producer.js";
import { addEmailJob } from "../lib/queue/bull.js";
const router = express.Router();
// 创建用户
router.post("/users", async (req, res) => {
try {
const user = await UserService.create(req.body);
// 发布用户创建事件(异步)
await publishUserEvent("user.created", {
id: user.id,
email: user.email,
username: user.username,
});
// 发送欢迎邮件(异步任务)
await addEmailJob({
to: user.email,
subject: "欢迎注册",
body: `您好 ${user.username},欢迎注册!`,
});
res.json({ success: true, data: user });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
export default router;常见模式
1. 发布/订阅模式
javascript
// RabbitMQ 发布/订阅
// 多个消费者可以订阅同一个交换机,收到相同的消息
await channel.assertExchange("notifications", "fanout", { durable: true });2. 路由模式
javascript
// 根据路由键精确匹配
// user.created -> 发送到用户创建处理器
// user.updated -> 发送到用户更新处理器
await channel.assertExchange("user", "topic", { durable: true });
await channel.bindQueue(userQueue, "user", "user.created");3. 工作队列模式
javascript
// 多个消费者共享一个队列,消息在消费者间轮询分发
await channel.assertQueue("tasks", { durable: true });
// 多个 worker 同时消费消息队列注意事项
- 消息处理要保证幂等性
- 设置合理的重试机制
- 监控队列堆积情况
- 考虑消息顺序性需求
总结
使用 Claude Code 实现消息队列:
- RabbitMQ 适合中小型应用
- Kafka 适合高吞吐量场景
- Bull 适合简单的任务队列
- 合理设计消息格式和路由
- 实现重试和错误处理
- 监控队列健康状态