基于WebSocket和观察者模式的排队系统设计与实现
在高并发系统中,当我们需要限制对某些昂贵或有限资源的访问时(例如,AI服务、实时转码、数据库连接池等),一个健壮的排队系统是不可或缺的。本文将通过分析一个具体的Java后端实现,深入探讨如何利用WebSocket构建一个实时的、可扩展的排队服务,并重点介绍如何通过观察者模式实现模块间的优雅解耦。
一、 整体设计思路与架构
系统的核心目标是:允许多个用户通过客户端连接到服务,但只允许有限数量(例如 MAX_ACTIVE_USERS)的用户同时访问核心资源。超出此数量的用户需要进入一个等待队列,并在有用户离开时按顺序获得访问权限。整个过程需要对用户实时透明。
基于这个目标,我们选择了以下技术栈和架构:
- WebSocket: 用于实现客户端与服务器之间的全双工实时通信。这使得服务器可以主动向排队中的用户推送他们的位置更新,或在轮到他们时立即授予访问权限,提供了极佳的用户体验。
- Spring Boot: 作为后端开发框架,集成了强大的WebSocket支持。
- 内存队列: 使用线程安全的数据结构(如
ConcurrentHashMap和ConcurrentLinkedQueue)在服务器内存中管理“活动用户”和“等待队列”,实现高效的状态管理。 - 观察者模式: 用于解耦排队服务与其他业务服务(如ASR服务)。当一个用户的会话结束时,排队服务能够通知其他关心此事件的服务进行相应的资源清理,而无需硬编码依赖。
整个系统的交互流程如下:
- 客户端携带认证Token,通过WebSocket连接到排队服务的
/api/v1/queue端点。 - 服务器通过
HandshakeInterceptor拦截握手请求,验证Token的有效性。 - 验证通过后,
QueueWebSocketHandler接收连接,并将其交由核心的QueueService处理。 QueueService判断当前活动用户数是否已满:- 未满: 将用户设为活动状态,并发送
ACCESS_GRANTED消息。 - 已满: 将用户加入等待队列,并广播
QUEUE_UPDATE消息给所有排队者。
- 未满: 将用户设为活动状态,并发送
- 当一个活动用户断开连接时,
QueueService将其移除,并从队列头部取出一个用户,授予其访问权限。 - 同时,
QueueService通过观察者模式通知其他订阅了“会话结束”事件的服务(如ASRProxyService),以便它们可以清理相关资源。
二、 核心实现深度解析
1. 连接建立与身份认证
任何WebSocket连接的第一步都是建立连接(握手)。为了确保只有授权用户才能进入排队系统,我们在握手阶段就进行身份验证。这是通过实现Spring的 HandshakeInterceptor 接口完成的。
TokenAuthHandshakeInterceptor.java:
@Component
public class TokenAuthHandshakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
// 从URL查询参数中获取token
String token = UriComponentsBuilder.fromUri(request.getURI()).build().getQueryParams().getFirst("token");
if (token == null || token.trim().isEmpty()) {
return false; // 拒绝连接
}
try {
// 通过token获取用户ID
String userId = (String) StpUtil.getLoginIdByToken(token);
// 将用户信息存入WebSocket session的attributes中
attributes.put("userId", userId);
return true; // 允许连接
} catch (NotLoginException e) {
return false; // 拒绝连接
}
}
// ...
}
这个拦截器在 beforeHandshake 方法中,从连接的URL(例如 ws://server.com/api/v1/queue?token=xxx)中提取token参数。然后,它使用一个认证框架(这里是Sa-Token)来验证token并解析出用户ID。最关键的一步是 attributes.put("userId", userId);,它将用户ID存入了WebSocket Session的属性中。这样,在后续的业务逻辑里,我们随时可以从Session中获取当前用户的身份。
最后,在 WebSocketConfig.java 中,我们将这个拦截器注册到 /api/v1/queue 路径上,使其生效。
2. 核心排队逻辑 (QueueService)
QueueService 是整个排队系统的“大脑”,它负责管理两个核心数据结构:
activeSessions: 一个ConcurrentHashMap<String, WebSocketSession>,用于存储正在使用服务的活动用户。Key是userId,Value是他们的WebSocketSession。使用userId作为Key可以有效防止同一用户重复进入活动列表。waitingQueue: 一个ConcurrentLinkedQueue<WebSocketSession>,一个高性能的线程安全队列,用于存储正在等待的用户,严格保证先进-先出(FIFO)。
新用户连接处理 (handleConnectionEstablished)
public synchronized void handleConnectionEstablished(WebSocketSession session) {
String userId = getUserId(session);
// 检查用户是否已经存在于活动会话或等待队列中
if (isUserAlreadyConnected(userId)) {
// 发送重复连接错误并关闭新连接
return;
}
if (activeSessions.size() < MAX_ACTIVE_USERS) {
// 有空位,直接授予访问权限
grantAccess(session);
} else {
// 无空位,加入排队队列
addToQueue(session);
}
}
这里的逻辑非常清晰:
- 防重入检查:
isUserAlreadyConnected方法会检查该userId是否已存在于activeSessions或waitingQueue中,防止同一用户重复排队。 - 分配逻辑: 如果活动用户数未达到上限,则调用
grantAccess方法,将用户加入activeSessions并发送ACCESS_GRANTED消息。否则,调用addToQueue方法,将其加入waitingQueue并广播队列状态更新。
用户断开连接处理 (handleConnectionClosed)
public synchronized void handleConnectionClosed(WebSocketSession session) {
String userId = getUserId(session);
// ...
if (activeSessions.containsKey(userId)) {
// 确保断开的是当前存储的那个session
if (activeSessions.get(userId).getId().equals(session.getId())) {
activeSessions.remove(userId);
logger.info("活动用户 {} 已离开, 释放一个名额.", userId);
// 触发回调,通知所有监听器
listeners.forEach(listener -> listener.onSessionEnd(userId));
// 从队列中提拔一位用户
promoteNextInQueue();
}
} else {
// 如果是排队用户离开,则从队列中移除
if (waitingQueue.remove(session)) {
broadcastQueueUpdates();
}
}
}
这是系统的关键环节:
- 判断用户类型: 检查该用户是在
activeSessions中还是在waitingQueue中。 - 释放名额: 如果是活动用户离开,就从
activeSessions中移除,并调用promoteNextInQueue()方法,该方法会从waitingQueue头部取出一个用户并授予他访问权限。 - 触发回调: 这是观察者模式的核心,我们稍后详述。
- 更新队列: 如果是等待中的用户离开,只需将其从
waitingQueue中移除,并向其他等待者广播最新的排队信息。
三、 基于观察者模式的服务解耦与架构思考
1. 资源服务的权限验证
在我们的系统中,QueueService 扮演着“守门人”的角色,而 ASRProxyService(语音识别代理服务)则是受保护的“资源”。当一个用户在通过排队获得访问资格后,他会尝试连接ASR服务的WebSocket端点。此时,ASRProxyService 必须再次进行验证,确保这个用户是“合法的”。
ASRProxyService 的 startProxy 方法会执行这个检查:它会调用 queueService.isUserActive(userId) 方法,查询该用户ID当前是否存在于 QueueService 的活动用户列表 activeSessions 中。只有当用户处于活动状态时,ASRProxyService 才会为其建立到上游供应商的连接;否则,将拒绝连接。这保证了只有通过排队系统授权的用户才能消耗昂贵的ASR资源。
2. 使用观察者模式进行状态同步
验证了用户有权访问后,下一个问题是:当用户会话结束时,如何优雅地清理 ASRProxyService 中创建的各种资源(如与上游服务的连接)?
一种糟糕的设计是让 QueueService 在用户断开时,硬编码去调用 ASRProxyService 的清理方法。这会造成紧耦合:
QueueService将依赖ASRProxyService,未来如果新增TTSService、ImageService等,都需要修改QueueService的代码。- 代码职责不清,
QueueService承担了它不应该关心的业务逻辑。
为了解决这个问题,我们引入了观察者模式。
3. 定义事件监听器接口 (UserSessionEventListener)
我们首先定义一个抽象的“事件”,即“用户会话结束”。任何关心这个事件的模块,都应该能够“听到”它。为此,我们创建了一个接口:
UserSessionEventListener.java:
/**
* 用户活动会话事件监听器接口
* 这里就是用观察者模式来监听用户会话结束事件。
*/
public interface UserSessionEventListener {
/**
* 当一个用户的活动会话结束时调用
* @param userId 结束会话的用户ID
*/
void onSessionEnd(String userId);
}
这个接口非常简单,只定义了一个 onSessionEnd 方法。任何需要对用户会话结束做出反应的服务,都应该实现这个接口。
4. QueueService 作为主题(Subject)
QueueService 是事件的发布者(也称为“主题”或“被观察者”)。它维护一个监听器列表,并在事件发生时通知所有监听器。
QueueService.java:
public class QueueService {
// ...
private final List<UserSessionEventListener> listeners = new ArrayList<>();
public void addUserSessionEventListener(UserSessionEventListener listener) {
this.listeners.add(listener);
}
// 在 handleConnectionClosed 方法中...
public synchronized void handleConnectionClosed(WebSocketSession session) {
// ...
if (activeSessions.containsKey(userId)) {
// ...
activeSessions.remove(userId);
// 触发回调,通知所有监听器
listeners.forEach(listener -> listener.onSessionEnd(userId));
promoteNextInQueue();
}
}
}
QueueService 提供了 addUserSessionEventListener 方法,允许其他服务将自己“注册”为监听器。当 handleConnectionClosed 检测到一个活动用户离开时,它会遍历 listeners 列表,并调用每个监听器的 onSessionEnd 方法。
5. ASRProxyService 作为观察者(Observer)
ASRProxyService 是事件的订阅者(“观察者”)。它实现了 UserSessionEventListener 接口,并在应用启动时将自己注册到 QueueService。
ASRProxyService.java:
@Service
public class ASRProxyService implements UserSessionEventListener {
@Autowired
public ASRProxyService(QueueService queueService) {
// ...
}
@PostConstruct
public void register() {
queueService.addUserSessionEventListener(this);
}
@Override
public void onSessionEnd(String userId) {
// 清理与该用户相关的所有ASR代理
stopProxy(userId);
}
// ... 其他ASR代理逻辑
}
通过 @PostConstruct 注解,ASRProxyService 在初始化后立即将自己注册为监听器。当 QueueService 发出 onSessionEnd 通知时,ASRProxyService 的同名方法就会被执行,从而触发资源清理。这种设计完美实现了开闭原则。
6. 架构演进思考:从单体到微服务
当前的设计(服务间直接方法调用和内存中的监听器列表)在单体应用中非常高效。QueueService 和 ASRProxyService 在同一个进程中,通信成本极低。
但如果系统规模扩大,我们可能会考虑将排队系统设计为一个独立的微服务。在这种架构下,服务间的交互方式会发生根本变化:
- 鉴权方式改变: 当排队服务授予用户访问权限时,它不再只是在内存中记录状态,而是会生成一个有时效性的临时访问令牌(如JWT)。
- 资源服务验证: 客户端携带这个临时令牌去请求
ASR微服务。ASR微服务会独立验证该令牌的有效性(签名、有效期等),而无需再向排队服务进行查询。这降低了服务间的实时依赖。 - 回调网络化: 观察者模式中的回调通知,也需要通过网络实现。当用户会话结束时,排队服务可以通过Webhook调用
ASR微服务预先注册的清理接口,或者向一个公共的消息队列(如RabbitMQ, Kafka)发布一条“会话结束”消息,ASR微服务作为订阅者接收并处理该消息。
虽然微服务架构增加了复杂性,但它提供了更好的水平扩展性、故障隔离和团队独立开发的能力,是大型系统的必然演进方向。
总结
通过分析这些代码,我们构建了一个功能完善、架构清晰的实时排队系统。其关键成功要素包括:
- 使用WebSocket实现了低延迟、实时的用户状态反馈。
- 清晰的职责划分,
Handler负责协议层,Service负责业务逻辑,Interceptor负责安全。 - 高效的并发管理,利用
ConcurrentHashMap和ConcurrentLinkedQueue保证了多用户并发访问下的线程安全和性能。 - 优雅的服务解耦,通过观察者模式,将核心的排队逻辑与具体的业务(如ASR服务)分离开来,极大地提高了系统的可维护性和可扩展性。
这套架构不仅适用于ASR服务,也可以轻松扩展到任何需要限制并发访问的资源密集型场景中,并且为未来向微服务架构演进奠定了良好的基础。