零度AI
高级20 分钟阅读

Claude Code WebSocket实时通信

使用 Claude Code 实现 WebSocket 实时通信,包括聊天、推送、在线状态等功能

Claude CodeWebSocket实时通信Socket.io推送

WebSocket 概述

WebSocket 提供了客户端和服务端之间的双向实时通信能力。Claude Code 可以帮你快速实现各种实时功能。

WebSocket vs HTTP

HTTP 是请求-响应模式,WebSocket 建立后可以双向通信,适合聊天、实时协作、监控推送等场景。

Socket.io 服务器

基础配置

bash
帮我创建一个使用 Socket.io 的实时聊天服务器。
javascript
// lib/socket/index.js
import { Server } from "socket.io";
import jwt from "jsonwebtoken";
import onlineUsers from "./onlineUsers.js";

let io;

export function initSocket(httpServer) {
  io = new Server(httpServer, {
    cors: {
      origin: process.env.CLIENT_URL || "http://localhost:3000",
      methods: ["GET", "POST"],
      credentials: true,
    },
    pingTimeout: 60000,
    pingInterval: 25000,
  });

  // 认证中间件
  io.use((socket, next) => {
    const token = socket.handshake.auth.token;

    if (!token) {
      return next(new Error("未提供认证令牌"));
    }

    try {
      const decoded = jwt.verify(token, process.env.JWT_SECRET);
      socket.userId = decoded.userId;
      socket.username = decoded.username;
      next();
    } catch (error) {
      next(new Error("认证失败"));
    }
  });

  io.on("connection", (socket) => {
    console.log(`用户连接: ${socket.username} (${socket.id})`);

    // 添加到在线用户
    onlineUsers.add(socket.userId, socket.id);

    // 广播用户上线
    socket.broadcast.emit("user:online", {
      userId: socket.userId,
      username: socket.username,
    });

    // 加入用户房间
    socket.join(`user:${socket.userId}`);

    // 消息处理
    socket.on("message:send", handleMessage(socket));
    socket.on("message:read", handleMessageRead(socket));

    // 好友请求
    socket.on("friend:request", handleFriendRequest(socket));
    socket.on("friend:accept", handleFriendAccept(socket));
    socket.on("friend:reject", handleFriendReject(socket));

    // typing 状态
    socket.on("typing:start", handleTypingStart(socket));
    socket.on("typing:stop", handleTypingStop(socket));

    // 断开连接
    socket.on("disconnect", handleDisconnect(socket));
  });

  return io;
}

// 在线用户管理
const onlineUsers = {
  users: new Map(),

  add(userId, socketId) {
    if (!this.users.has(userId)) {
      this.users.set(userId, new Set());
    }
    this.users.get(userId).add(socketId);
  },

  remove(userId, socketId) {
    if (this.users.has(userId)) {
      this.users.get(userId).delete(socketId);
      if (this.users.get(userId).size === 0) {
        this.users.delete(userId);
      }
    }
  },

  getSockets(userId) {
    return this.users.get(userId) || new Set();
  },

  isOnline(userId) {
    return this.users.has(userId);
  },
};

消息处理

javascript
// lib/socket/handlers/message.js

export function handleMessage(socket) {
  return async (data) => {
    const { to, content, type = "text", metadata = {} } = data;

    try {
      // 保存消息到数据库
      const message = await Message.create({
        from: socket.userId,
        to,
        content,
        type,
        metadata,
        status: "sent",
      });

      // 构建消息响应
      const messageData = {
        id: message.id,
        from: {
          id: socket.userId,
          username: socket.username,
        },
        to,
        content,
        type,
        metadata,
        createdAt: message.createdAt,
      };

      // 发送给接收者(如果在线)
      const receiverSockets = onlineUsers.getSockets(to);
      receiverSockets.forEach((socketId) => {
        io.to(socketId).emit("message:receive", messageData);
      });

      // 发送回执给发送者
      socket.emit("message:sent", {
        id: message.id,
        status: "delivered",
        deliveredTo: receiverSockets.size,
      });
    } catch (error) {
      socket.emit("message:error", {
        id: data.tempId,
        error: error.message,
      });
    }
  };
}

export function handleMessageRead(socket) {
  return async (data) => {
    const { messageIds, from } = data;

    await Message.updateMany(
      { _id: { $in: messageIds }, to: socket.userId },
      { status: "read" }
    );

    // 通知发送者消息已读
    const senderSockets = onlineUsers.getSockets(from);
    senderSockets.forEach((socketId) => {
      io.to(socketId).emit("message:read", {
        messageIds,
        readBy: socket.userId,
        readAt: new Date(),
      });
    });
  };
}

断开连接处理

javascript
// lib/socket/handlers/disconnect.js

export function handleDisconnect(socket) {
  return (reason) => {
    console.log(`用户断开: ${socket.username} (${socket.id}) - ${reason}`);

    // 从在线用户中移除
    onlineUsers.remove(socket.userId, socket.id);

    // 如果用户完全离线
    if (!onlineUsers.isOnline(socket.userId)) {
      // 广播用户离线
      socket.broadcast.emit("user:offline", {
        userId: socket.userId,
        username: socket.username,
      });
    }
  };
}

Express 集成

路由配置

javascript
// app.js
import express from "express";
import { createServer } from "http";
import { initSocket } from "./lib/socket/index.js";
import authRoutes from "./routes/auth.js";
import chatRoutes from "./routes/chat.js";
import { metricsMiddleware } from "./lib/middleware/metrics.js";

const app = express();
const httpServer = createServer(app);

// 初始化 Socket.io
initSocket(httpServer);

// 中间件
app.use(express.json());
app.use(metricsMiddleware);

// REST API 路由
app.use("/api/auth", authRoutes);
app.use("/api/chat", chatRoutes);

// 健康检查
app.get("/health", (req, res) => {
  res.json({ status: "ok", onlineUsers: onlineUsers.users.size });
});

// 启动服务器
const PORT = process.env.PORT || 3000;
httpServer.listen(PORT, () => {
  console.log(`Server running on port ${PORT}`);
});

客户端实现

React 客户端

javascript
// hooks/useSocket.js
import { useEffect, useRef, useState, useCallback } from "react";
import io from "socket.io-client";

export function useSocket(userId) {
  const socketRef = useRef(null);
  const [isConnected, setIsConnected] = useState(false);
  const [messages, setMessages] = useState([]);
  const [typingUsers, setTypingUsers] = useState({});

  useEffect(() => {
    if (!userId) return;

    const socket = io(process.env.NEXT_PUBLIC_API_URL, {
      auth: { token: localStorage.getItem("token") },
      transports: ["websocket"],
    });

    socketRef.current = socket;

    socket.on("connect", () => {
      console.log("Socket connected");
      setIsConnected(true);
    });

    socket.on("disconnect", (reason) => {
      console.log("Socket disconnected:", reason);
      setIsConnected(false);
    });

    // 消息接收
    socket.on("message:receive", (message) => {
      setMessages((prev) => [...prev, message]);
      // 可以播放提示音
      playNotificationSound();
    });

    // 消息已读
    socket.on("message:read", ({ messageIds, readBy }) => {
      setMessages((prev) =>
        prev.map((msg) =>
          messageIds.includes(msg.id)
            ? { ...msg, status: "read", readAt: new Date() }
            : msg
        )
      );
    });

    // typing 状态
    socket.on("typing:update", ({ userId, username, isTyping }) => {
      setTypingUsers((prev) => {
        if (isTyping) {
          return { ...prev, [userId]: username };
        } else {
          const { [userId]: _, ...rest } = prev;
          return rest;
        }
      });
    });

    return () => {
      socket.disconnect();
    };
  }, [userId]);

  const sendMessage = useCallback((to, content, type = "text", metadata = {}) => {
    if (socketRef.current) {
      socketRef.current.emit("message:send", { to, content, type, metadata });
    }
  }, []);

  const startTyping = useCallback((to) => {
    if (socketRef.current) {
      socketRef.current.emit("typing:start", { to });
    }
  }, []);

  const stopTyping = useCallback((to) => {
    if (socketRef.current) {
      socketRef.current.emit("typing:stop", { to });
    }
  }, []);

  return {
    isConnected,
    messages,
    typingUsers,
    sendMessage,
    startTyping,
    stopTyping,
  };
}

聊天组件

jsx
// components/Chat/ChatRoom.jsx
import { useState, useRef, useEffect } from "react";
import { useSocket } from "../../hooks/useSocket";

export default function ChatRoom({ recipientId, recipientName }) {
  const { isConnected, messages, typingUsers, sendMessage, startTyping, stopTyping } =
    useSocket(currentUserId);
  const [input, setInput] = useState("");
  const messagesEndRef = useRef(null);
  const typingTimeoutRef = useRef(null);

  // 自动滚动到最新消息
  useEffect(() => {
    messagesEndRef.current?.scrollIntoView({ behavior: "smooth" });
  }, [messages]);

  // 处理输入
  const handleInput = (e) => {
    setInput(e.target.value);

    // 发送 typing 状态
    startTyping(recipientId);

    // 防抖:停止 typing
    if (typingTimeoutRef.current) {
      clearTimeout(typingTimeoutRef.current);
    }
    typingTimeoutRef.current = setTimeout(() => {
      stopTyping(recipientId);
    }, 2000);
  };

  // 发送消息
  const handleSend = () => {
    if (!input.trim()) return;

    sendMessage(recipientId, input.trim());
    setInput("");
    stopTyping(recipientId);
  };

  return (
    <div className="chat-room">
      <div className="messages">
        {messages
          .filter(
            (m) =>
              (m.from.id === currentUserId && m.to === recipientId) ||
              (m.from.id === recipientId && m.to === currentUserId)
          )
          .map((msg) => (
            <div
              key={msg.id}
              className={`message ${msg.from.id === currentUserId ? "sent" : "received"}`}
            >
              <div className="content">{msg.content}</div>
              <div className="meta">
                {new Date(msg.createdAt).toLocaleTimeString()}
                {msg.from.id === currentUserId && (
                  <span className="status">{msg.status}</span>
                )}
              </div>
            </div>
          ))}

        {typingUsers[recipientId] && (
          <div className="typing-indicator">
            {typingUsers[recipientId]} 正在输入...
          </div>
        )}

        <div ref={messagesEndRef} />
      </div>

      <div className="input-area">
        <input
          type="text"
          value={input}
          onChange={handleInput}
          onKeyPress={(e) => e.key === "Enter" && handleSend()}
          placeholder="输入消息..."
        />
        <button onClick={handleSend} disabled={!input.trim()}>
          发送
        </button>
      </div>
    </div>
  );
}

实时状态同步

在线状态

javascript
// 服务器端 - 在线用户列表
socket.on("friends:get-online", () => {
  const friends = await Friend.list(socket.userId);
  const onlineStatus = friends.map((friend) => ({
    userId: friend.id,
    username: friend.username,
    isOnline: onlineUsers.isOnline(friend.id),
  }));

  socket.emit("friends:online-status", onlineStatus);
});

// 客户端 - 订阅好友状态
useEffect(() => {
  if (!socket) return;

  socket.on("user:online", ({ userId }) => {
    setOnlineUsers((prev) => new Set([...prev, userId]));
  });

  socket.on("user:offline", ({ userId }) => {
    setOnlineUsers((prev) => {
      const next = new Set(prev);
      next.delete(userId);
      return next;
    });
  });
}, [socket]);

最佳实践

1. 心跳保活

javascript
// 客户端
const PING_INTERVAL = 25000;

useEffect(() => {
  if (!socket) return;

  const pingInterval = setInterval(() => {
    if (socket.connected) {
      socket.emit("ping");
    }
  }, PING_INTERVAL);

  return () => clearInterval(pingInterval);
}, [socket]);

2. 消息重连

javascript
// 客户端重连逻辑
socket.on("connect_error", (error) => {
  console.log("Connection error:", error.message);
});

socket.on("reconnect", (attemptNumber) => {
  console.log("Reconnected after", attemptNumber, "attempts");
  // 重新订阅房间和频道
});

3. 消息队列

javascript
// 离线消息队列
export async function queueOfflineMessage(to, message) {
  await Redis.lpush(
    `offline:${to}`,
    JSON.stringify({
      ...message,
      queuedAt: Date.now(),
    })
  );
}

注意

WebSocket 连接数受服务器资源限制,大规模应用需要考虑使用专门的实时通信服务(如 Pusher、Ably)或分布式架构。

总结

使用 Claude Code 实现 WebSocket 实时通信:

  • 使用 Socket.io 简化开发
  • 实现完整的消息系统
  • 处理连接状态和异常
  • 添加 typing 等交互反馈
  • 实现消息重连和离线支持
  • 注意安全认证和性能优化