Administrator
Administrator
发布于 2025-08-23 / 15 阅读
0
0

基于WebSocket和观察者模式的排队系统设计与实现

基于WebSocket和观察者模式的排队系统设计与实现

在高并发系统中,当我们需要限制对某些昂贵或有限资源的访问时(例如,AI服务、实时转码、数据库连接池等),一个健壮的排队系统是不可或缺的。本文将通过分析一个具体的Java后端实现,深入探讨如何利用WebSocket构建一个实时的、可扩展的排队服务,并重点介绍如何通过观察者模式实现模块间的优雅解耦。


一、 整体设计思路与架构

系统的核心目标是:允许多个用户通过客户端连接到服务,但只允许有限数量(例如 MAX_ACTIVE_USERS)的用户同时访问核心资源。超出此数量的用户需要进入一个等待队列,并在有用户离开时按顺序获得访问权限。整个过程需要对用户实时透明。

基于这个目标,我们选择了以下技术栈和架构:

  1. WebSocket: 用于实现客户端与服务器之间的全双工实时通信。这使得服务器可以主动向排队中的用户推送他们的位置更新,或在轮到他们时立即授予访问权限,提供了极佳的用户体验。
  2. Spring Boot: 作为后端开发框架,集成了强大的WebSocket支持。
  3. 内存队列: 使用线程安全的数据结构(如 ConcurrentHashMapConcurrentLinkedQueue)在服务器内存中管理“活动用户”和“等待队列”,实现高效的状态管理。
  4. 观察者模式: 用于解耦排队服务与其他业务服务(如ASR服务)。当一个用户的会话结束时,排队服务能够通知其他关心此事件的服务进行相应的资源清理,而无需硬编码依赖。

整个系统的交互流程如下:

  1. 客户端携带认证Token,通过WebSocket连接到排队服务的 /api/v1/queue 端点。
  2. 服务器通过 HandshakeInterceptor 拦截握手请求,验证Token的有效性。
  3. 验证通过后,QueueWebSocketHandler 接收连接,并将其交由核心的 QueueService 处理。
  4. QueueService 判断当前活动用户数是否已满:
    • 未满: 将用户设为活动状态,并发送 ACCESS_GRANTED 消息。
    • 已满: 将用户加入等待队列,并广播 QUEUE_UPDATE 消息给所有排队者。
  5. 当一个活动用户断开连接时,QueueService 将其移除,并从队列头部取出一个用户,授予其访问权限。
  6. 同时,QueueService 通过观察者模式通知其他订阅了“会话结束”事件的服务(如 ASRProxyService),以便它们可以清理相关资源。

二、 核心实现深度解析

1. 连接建立与身份认证

任何WebSocket连接的第一步都是建立连接(握手)。为了确保只有授权用户才能进入排队系统,我们在握手阶段就进行身份验证。这是通过实现Spring的 HandshakeInterceptor 接口完成的。

TokenAuthHandshakeInterceptor.java:

@Component
public class TokenAuthHandshakeInterceptor implements HandshakeInterceptor {

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        // 从URL查询参数中获取token
        String token = UriComponentsBuilder.fromUri(request.getURI()).build().getQueryParams().getFirst("token");

        if (token == null || token.trim().isEmpty()) {
            return false; // 拒绝连接
        }

        try {
            // 通过token获取用户ID
            String userId = (String) StpUtil.getLoginIdByToken(token);
            // 将用户信息存入WebSocket session的attributes中
            attributes.put("userId", userId);
            return true; // 允许连接
        } catch (NotLoginException e) {
            return false; // 拒绝连接
        }
    }
    // ...
}

这个拦截器在 beforeHandshake 方法中,从连接的URL(例如 ws://server.com/api/v1/queue?token=xxx)中提取token参数。然后,它使用一个认证框架(这里是Sa-Token)来验证token并解析出用户ID。最关键的一步是 attributes.put("userId", userId);,它将用户ID存入了WebSocket Session的属性中。这样,在后续的业务逻辑里,我们随时可以从Session中获取当前用户的身份。

最后,在 WebSocketConfig.java 中,我们将这个拦截器注册到 /api/v1/queue 路径上,使其生效。

2. 核心排队逻辑 (QueueService)

QueueService 是整个排队系统的“大脑”,它负责管理两个核心数据结构:

  • activeSessions: 一个 ConcurrentHashMap<String, WebSocketSession>,用于存储正在使用服务的活动用户。Key是userId,Value是他们的WebSocketSession。使用userId作为Key可以有效防止同一用户重复进入活动列表。
  • waitingQueue: 一个 ConcurrentLinkedQueue<WebSocketSession>,一个高性能的线程安全队列,用于存储正在等待的用户,严格保证先进-先出(FIFO)。

新用户连接处理 (handleConnectionEstablished)

public synchronized void handleConnectionEstablished(WebSocketSession session) {
    String userId = getUserId(session);

    // 检查用户是否已经存在于活动会话或等待队列中
    if (isUserAlreadyConnected(userId)) {
        // 发送重复连接错误并关闭新连接
        return;
    }

    if (activeSessions.size() < MAX_ACTIVE_USERS) {
        // 有空位,直接授予访问权限
        grantAccess(session);
    } else {
        // 无空位,加入排队队列
        addToQueue(session);
    }
}

这里的逻辑非常清晰:

  1. 防重入检查: isUserAlreadyConnected 方法会检查该 userId 是否已存在于 activeSessionswaitingQueue 中,防止同一用户重复排队。
  2. 分配逻辑: 如果活动用户数未达到上限,则调用 grantAccess 方法,将用户加入 activeSessions 并发送 ACCESS_GRANTED 消息。否则,调用 addToQueue 方法,将其加入 waitingQueue 并广播队列状态更新。

用户断开连接处理 (handleConnectionClosed)

public synchronized void handleConnectionClosed(WebSocketSession session) {
    String userId = getUserId(session);
    // ...
    if (activeSessions.containsKey(userId)) {
        // 确保断开的是当前存储的那个session
        if (activeSessions.get(userId).getId().equals(session.getId())) {
            activeSessions.remove(userId);
            logger.info("活动用户 {} 已离开, 释放一个名额.", userId);

            // 触发回调,通知所有监听器
            listeners.forEach(listener -> listener.onSessionEnd(userId));

            // 从队列中提拔一位用户
            promoteNextInQueue();
        }
    } else {
        // 如果是排队用户离开,则从队列中移除
        if (waitingQueue.remove(session)) {
            broadcastQueueUpdates();
        }
    }
}

这是系统的关键环节:

  1. 判断用户类型: 检查该用户是在 activeSessions 中还是在 waitingQueue 中。
  2. 释放名额: 如果是活动用户离开,就从 activeSessions 中移除,并调用 promoteNextInQueue() 方法,该方法会从 waitingQueue 头部取出一个用户并授予他访问权限。
  3. 触发回调: 这是观察者模式的核心,我们稍后详述。
  4. 更新队列: 如果是等待中的用户离开,只需将其从 waitingQueue 中移除,并向其他等待者广播最新的排队信息。

三、 基于观察者模式的服务解耦与架构思考

1. 资源服务的权限验证

在我们的系统中,QueueService 扮演着“守门人”的角色,而 ASRProxyService(语音识别代理服务)则是受保护的“资源”。当一个用户在通过排队获得访问资格后,他会尝试连接ASR服务的WebSocket端点。此时,ASRProxyService 必须再次进行验证,确保这个用户是“合法的”。

ASRProxyServicestartProxy 方法会执行这个检查:它会调用 queueService.isUserActive(userId) 方法,查询该用户ID当前是否存在于 QueueService 的活动用户列表 activeSessions 中。只有当用户处于活动状态时,ASRProxyService 才会为其建立到上游供应商的连接;否则,将拒绝连接。这保证了只有通过排队系统授权的用户才能消耗昂贵的ASR资源。

2. 使用观察者模式进行状态同步

验证了用户有权访问后,下一个问题是:当用户会话结束时,如何优雅地清理 ASRProxyService 中创建的各种资源(如与上游服务的连接)?

一种糟糕的设计是让 QueueService 在用户断开时,硬编码去调用 ASRProxyService 的清理方法。这会造成紧耦合

  • QueueService 将依赖 ASRProxyService,未来如果新增 TTSServiceImageService 等,都需要修改 QueueService 的代码。
  • 代码职责不清,QueueService 承担了它不应该关心的业务逻辑。

为了解决这个问题,我们引入了观察者模式

3. 定义事件监听器接口 (UserSessionEventListener)

我们首先定义一个抽象的“事件”,即“用户会话结束”。任何关心这个事件的模块,都应该能够“听到”它。为此,我们创建了一个接口:

UserSessionEventListener.java:

/**
 * 用户活动会话事件监听器接口
 * 这里就是用观察者模式来监听用户会话结束事件。
 */
public interface UserSessionEventListener {
    /**
     * 当一个用户的活动会话结束时调用
     * @param userId 结束会话的用户ID
     */
    void onSessionEnd(String userId);
}

这个接口非常简单,只定义了一个 onSessionEnd 方法。任何需要对用户会话结束做出反应的服务,都应该实现这个接口。

4. QueueService 作为主题(Subject)

QueueService 是事件的发布者(也称为“主题”或“被观察者”)。它维护一个监听器列表,并在事件发生时通知所有监听器。

QueueService.java:

public class QueueService {
    // ...
    private final List<UserSessionEventListener> listeners = new ArrayList<>();

    public void addUserSessionEventListener(UserSessionEventListener listener) {
        this.listeners.add(listener);
    }

    // 在 handleConnectionClosed 方法中...
    public synchronized void handleConnectionClosed(WebSocketSession session) {
        // ...
        if (activeSessions.containsKey(userId)) {
            // ...
            activeSessions.remove(userId);
            
            // 触发回调,通知所有监听器
            listeners.forEach(listener -> listener.onSessionEnd(userId));

            promoteNextInQueue();
        }
    }
}

QueueService 提供了 addUserSessionEventListener 方法,允许其他服务将自己“注册”为监听器。当 handleConnectionClosed 检测到一个活动用户离开时,它会遍历 listeners 列表,并调用每个监听器的 onSessionEnd 方法。

5. ASRProxyService 作为观察者(Observer)

ASRProxyService 是事件的订阅者(“观察者”)。它实现了 UserSessionEventListener 接口,并在应用启动时将自己注册到 QueueService

ASRProxyService.java:

@Service
public class ASRProxyService implements UserSessionEventListener {

    @Autowired
    public ASRProxyService(QueueService queueService) {
        // ...
    }

    @PostConstruct
    public void register() {
        queueService.addUserSessionEventListener(this);
    }

    @Override
    public void onSessionEnd(String userId) {
        // 清理与该用户相关的所有ASR代理
        stopProxy(userId);
    }
    
    // ... 其他ASR代理逻辑
}

通过 @PostConstruct 注解,ASRProxyService 在初始化后立即将自己注册为监听器。当 QueueService 发出 onSessionEnd 通知时,ASRProxyService 的同名方法就会被执行,从而触发资源清理。这种设计完美实现了开闭原则

6. 架构演进思考:从单体到微服务

当前的设计(服务间直接方法调用和内存中的监听器列表)在单体应用中非常高效。QueueServiceASRProxyService 在同一个进程中,通信成本极低。

但如果系统规模扩大,我们可能会考虑将排队系统设计为一个独立的微服务。在这种架构下,服务间的交互方式会发生根本变化:

  1. 鉴权方式改变: 当排队服务授予用户访问权限时,它不再只是在内存中记录状态,而是会生成一个有时效性的临时访问令牌(如JWT)。
  2. 资源服务验证: 客户端携带这个临时令牌去请求 ASR微服务ASR微服务 会独立验证该令牌的有效性(签名、有效期等),而无需再向排队服务进行查询。这降低了服务间的实时依赖。
  3. 回调网络化: 观察者模式中的回调通知,也需要通过网络实现。当用户会话结束时,排队服务可以通过Webhook调用 ASR微服务 预先注册的清理接口,或者向一个公共的消息队列(如RabbitMQ, Kafka)发布一条“会话结束”消息,ASR微服务 作为订阅者接收并处理该消息。

虽然微服务架构增加了复杂性,但它提供了更好的水平扩展性、故障隔离和团队独立开发的能力,是大型系统的必然演进方向。


总结

通过分析这些代码,我们构建了一个功能完善、架构清晰的实时排队系统。其关键成功要素包括:

  • 使用WebSocket实现了低延迟、实时的用户状态反馈。
  • 清晰的职责划分Handler负责协议层,Service负责业务逻辑,Interceptor负责安全。
  • 高效的并发管理,利用ConcurrentHashMapConcurrentLinkedQueue保证了多用户并发访问下的线程安全和性能。
  • 优雅的服务解耦,通过观察者模式,将核心的排队逻辑与具体的业务(如ASR服务)分离开来,极大地提高了系统的可维护性和可扩展性。

这套架构不仅适用于ASR服务,也可以轻松扩展到任何需要限制并发访问的资源密集型场景中,并且为未来向微服务架构演进奠定了良好的基础。


评论