WebSocket 概述
WebSocket 提供了客户端和服务端之间的双向实时通信能力。Claude Code 可以帮你快速实现各种实时功能。
WebSocket vs HTTP
HTTP 是请求-响应模式,WebSocket 建立后可以双向通信,适合聊天、实时协作、监控推送等场景。
Socket.io 服务器
基础配置
bash
帮我创建一个使用 Socket.io 的实时聊天服务器。javascript
// lib/socket/index.js
import { Server } from "socket.io";
import jwt from "jsonwebtoken";
import onlineUsers from "./onlineUsers.js";
let io;
export function initSocket(httpServer) {
io = new Server(httpServer, {
cors: {
origin: process.env.CLIENT_URL || "http://localhost:3000",
methods: ["GET", "POST"],
credentials: true,
},
pingTimeout: 60000,
pingInterval: 25000,
});
// 认证中间件
io.use((socket, next) => {
const token = socket.handshake.auth.token;
if (!token) {
return next(new Error("未提供认证令牌"));
}
try {
const decoded = jwt.verify(token, process.env.JWT_SECRET);
socket.userId = decoded.userId;
socket.username = decoded.username;
next();
} catch (error) {
next(new Error("认证失败"));
}
});
io.on("connection", (socket) => {
console.log(`用户连接: ${socket.username} (${socket.id})`);
// 添加到在线用户
onlineUsers.add(socket.userId, socket.id);
// 广播用户上线
socket.broadcast.emit("user:online", {
userId: socket.userId,
username: socket.username,
});
// 加入用户房间
socket.join(`user:${socket.userId}`);
// 消息处理
socket.on("message:send", handleMessage(socket));
socket.on("message:read", handleMessageRead(socket));
// 好友请求
socket.on("friend:request", handleFriendRequest(socket));
socket.on("friend:accept", handleFriendAccept(socket));
socket.on("friend:reject", handleFriendReject(socket));
// typing 状态
socket.on("typing:start", handleTypingStart(socket));
socket.on("typing:stop", handleTypingStop(socket));
// 断开连接
socket.on("disconnect", handleDisconnect(socket));
});
return io;
}
// 在线用户管理
const onlineUsers = {
users: new Map(),
add(userId, socketId) {
if (!this.users.has(userId)) {
this.users.set(userId, new Set());
}
this.users.get(userId).add(socketId);
},
remove(userId, socketId) {
if (this.users.has(userId)) {
this.users.get(userId).delete(socketId);
if (this.users.get(userId).size === 0) {
this.users.delete(userId);
}
}
},
getSockets(userId) {
return this.users.get(userId) || new Set();
},
isOnline(userId) {
return this.users.has(userId);
},
};消息处理
javascript
// lib/socket/handlers/message.js
export function handleMessage(socket) {
return async (data) => {
const { to, content, type = "text", metadata = {} } = data;
try {
// 保存消息到数据库
const message = await Message.create({
from: socket.userId,
to,
content,
type,
metadata,
status: "sent",
});
// 构建消息响应
const messageData = {
id: message.id,
from: {
id: socket.userId,
username: socket.username,
},
to,
content,
type,
metadata,
createdAt: message.createdAt,
};
// 发送给接收者(如果在线)
const receiverSockets = onlineUsers.getSockets(to);
receiverSockets.forEach((socketId) => {
io.to(socketId).emit("message:receive", messageData);
});
// 发送回执给发送者
socket.emit("message:sent", {
id: message.id,
status: "delivered",
deliveredTo: receiverSockets.size,
});
} catch (error) {
socket.emit("message:error", {
id: data.tempId,
error: error.message,
});
}
};
}
export function handleMessageRead(socket) {
return async (data) => {
const { messageIds, from } = data;
await Message.updateMany(
{ _id: { $in: messageIds }, to: socket.userId },
{ status: "read" }
);
// 通知发送者消息已读
const senderSockets = onlineUsers.getSockets(from);
senderSockets.forEach((socketId) => {
io.to(socketId).emit("message:read", {
messageIds,
readBy: socket.userId,
readAt: new Date(),
});
});
};
}断开连接处理
javascript
// lib/socket/handlers/disconnect.js
export function handleDisconnect(socket) {
return (reason) => {
console.log(`用户断开: ${socket.username} (${socket.id}) - ${reason}`);
// 从在线用户中移除
onlineUsers.remove(socket.userId, socket.id);
// 如果用户完全离线
if (!onlineUsers.isOnline(socket.userId)) {
// 广播用户离线
socket.broadcast.emit("user:offline", {
userId: socket.userId,
username: socket.username,
});
}
};
}Express 集成
路由配置
javascript
// app.js
import express from "express";
import { createServer } from "http";
import { initSocket } from "./lib/socket/index.js";
import authRoutes from "./routes/auth.js";
import chatRoutes from "./routes/chat.js";
import { metricsMiddleware } from "./lib/middleware/metrics.js";
const app = express();
const httpServer = createServer(app);
// 初始化 Socket.io
initSocket(httpServer);
// 中间件
app.use(express.json());
app.use(metricsMiddleware);
// REST API 路由
app.use("/api/auth", authRoutes);
app.use("/api/chat", chatRoutes);
// 健康检查
app.get("/health", (req, res) => {
res.json({ status: "ok", onlineUsers: onlineUsers.users.size });
});
// 启动服务器
const PORT = process.env.PORT || 3000;
httpServer.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});客户端实现
React 客户端
javascript
// hooks/useSocket.js
import { useEffect, useRef, useState, useCallback } from "react";
import io from "socket.io-client";
export function useSocket(userId) {
const socketRef = useRef(null);
const [isConnected, setIsConnected] = useState(false);
const [messages, setMessages] = useState([]);
const [typingUsers, setTypingUsers] = useState({});
useEffect(() => {
if (!userId) return;
const socket = io(process.env.NEXT_PUBLIC_API_URL, {
auth: { token: localStorage.getItem("token") },
transports: ["websocket"],
});
socketRef.current = socket;
socket.on("connect", () => {
console.log("Socket connected");
setIsConnected(true);
});
socket.on("disconnect", (reason) => {
console.log("Socket disconnected:", reason);
setIsConnected(false);
});
// 消息接收
socket.on("message:receive", (message) => {
setMessages((prev) => [...prev, message]);
// 可以播放提示音
playNotificationSound();
});
// 消息已读
socket.on("message:read", ({ messageIds, readBy }) => {
setMessages((prev) =>
prev.map((msg) =>
messageIds.includes(msg.id)
? { ...msg, status: "read", readAt: new Date() }
: msg
)
);
});
// typing 状态
socket.on("typing:update", ({ userId, username, isTyping }) => {
setTypingUsers((prev) => {
if (isTyping) {
return { ...prev, [userId]: username };
} else {
const { [userId]: _, ...rest } = prev;
return rest;
}
});
});
return () => {
socket.disconnect();
};
}, [userId]);
const sendMessage = useCallback((to, content, type = "text", metadata = {}) => {
if (socketRef.current) {
socketRef.current.emit("message:send", { to, content, type, metadata });
}
}, []);
const startTyping = useCallback((to) => {
if (socketRef.current) {
socketRef.current.emit("typing:start", { to });
}
}, []);
const stopTyping = useCallback((to) => {
if (socketRef.current) {
socketRef.current.emit("typing:stop", { to });
}
}, []);
return {
isConnected,
messages,
typingUsers,
sendMessage,
startTyping,
stopTyping,
};
}聊天组件
jsx
// components/Chat/ChatRoom.jsx
import { useState, useRef, useEffect } from "react";
import { useSocket } from "../../hooks/useSocket";
export default function ChatRoom({ recipientId, recipientName }) {
const { isConnected, messages, typingUsers, sendMessage, startTyping, stopTyping } =
useSocket(currentUserId);
const [input, setInput] = useState("");
const messagesEndRef = useRef(null);
const typingTimeoutRef = useRef(null);
// 自动滚动到最新消息
useEffect(() => {
messagesEndRef.current?.scrollIntoView({ behavior: "smooth" });
}, [messages]);
// 处理输入
const handleInput = (e) => {
setInput(e.target.value);
// 发送 typing 状态
startTyping(recipientId);
// 防抖:停止 typing
if (typingTimeoutRef.current) {
clearTimeout(typingTimeoutRef.current);
}
typingTimeoutRef.current = setTimeout(() => {
stopTyping(recipientId);
}, 2000);
};
// 发送消息
const handleSend = () => {
if (!input.trim()) return;
sendMessage(recipientId, input.trim());
setInput("");
stopTyping(recipientId);
};
return (
<div className="chat-room">
<div className="messages">
{messages
.filter(
(m) =>
(m.from.id === currentUserId && m.to === recipientId) ||
(m.from.id === recipientId && m.to === currentUserId)
)
.map((msg) => (
<div
key={msg.id}
className={`message ${msg.from.id === currentUserId ? "sent" : "received"}`}
>
<div className="content">{msg.content}</div>
<div className="meta">
{new Date(msg.createdAt).toLocaleTimeString()}
{msg.from.id === currentUserId && (
<span className="status">{msg.status}</span>
)}
</div>
</div>
))}
{typingUsers[recipientId] && (
<div className="typing-indicator">
{typingUsers[recipientId]} 正在输入...
</div>
)}
<div ref={messagesEndRef} />
</div>
<div className="input-area">
<input
type="text"
value={input}
onChange={handleInput}
onKeyPress={(e) => e.key === "Enter" && handleSend()}
placeholder="输入消息..."
/>
<button onClick={handleSend} disabled={!input.trim()}>
发送
</button>
</div>
</div>
);
}实时状态同步
在线状态
javascript
// 服务器端 - 在线用户列表
socket.on("friends:get-online", () => {
const friends = await Friend.list(socket.userId);
const onlineStatus = friends.map((friend) => ({
userId: friend.id,
username: friend.username,
isOnline: onlineUsers.isOnline(friend.id),
}));
socket.emit("friends:online-status", onlineStatus);
});
// 客户端 - 订阅好友状态
useEffect(() => {
if (!socket) return;
socket.on("user:online", ({ userId }) => {
setOnlineUsers((prev) => new Set([...prev, userId]));
});
socket.on("user:offline", ({ userId }) => {
setOnlineUsers((prev) => {
const next = new Set(prev);
next.delete(userId);
return next;
});
});
}, [socket]);最佳实践
1. 心跳保活
javascript
// 客户端
const PING_INTERVAL = 25000;
useEffect(() => {
if (!socket) return;
const pingInterval = setInterval(() => {
if (socket.connected) {
socket.emit("ping");
}
}, PING_INTERVAL);
return () => clearInterval(pingInterval);
}, [socket]);2. 消息重连
javascript
// 客户端重连逻辑
socket.on("connect_error", (error) => {
console.log("Connection error:", error.message);
});
socket.on("reconnect", (attemptNumber) => {
console.log("Reconnected after", attemptNumber, "attempts");
// 重新订阅房间和频道
});3. 消息队列
javascript
// 离线消息队列
export async function queueOfflineMessage(to, message) {
await Redis.lpush(
`offline:${to}`,
JSON.stringify({
...message,
queuedAt: Date.now(),
})
);
}注意
WebSocket 连接数受服务器资源限制,大规模应用需要考虑使用专门的实时通信服务(如 Pusher、Ably)或分布式架构。
总结
使用 Claude Code 实现 WebSocket 实时通信:
- 使用 Socket.io 简化开发
- 实现完整的消息系统
- 处理连接状态和异常
- 添加 typing 等交互反馈
- 实现消息重连和离线支持
- 注意安全认证和性能优化