关于WebSocket分布式实现的一种方案

WebSocket常用于做后台消息推送,也可以做简易的IM聊天,由于WebSocket中的Session没有实现序列化接口的,我们无法将session序列化实现分布式部署,今天就来记录一种分布式的实现方案。

实现原理

首先我们讲的这种方式是利用redis订阅和发布模式来实现,大致过程:

  • 每个服务器记录连接,保存在内存当中
  • 当需要推送websocket消息的时候,同时在redis发布一个消息
  • 每个服务器订阅redis的消息,当监听到有消息时,每台服务器遍历自己内存当中的连接进行发送

这样我们就可以实现websocket的分布式部署,当然redis订阅和发布也可以用其他消息队列工具类实现。

实现过程

这里并不打算贴所有代码,只记录关键的一些代码,其余的可以网上查看相关资料,基于SpringBoot2.1.8实现。

pom文件引入redis和websocket

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--websocket-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

写一个redis发布器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
public class PublishService {
@Autowired
StringRedisTemplate redisTemplate;

/**
* 发布方法
*
* @param channel 消息发布订阅 主题
* @param message 消息信息
*/
public void publish(String channel, Object message) {
redisTemplate.convertAndSend(channel, message);
}
}

写一个redis监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class SubscribeListener implements MessageListener {

/**
* 订阅接收发布者的消息
*/
@Override
public void onMessage(Message message, byte[] pattern) {
String msg = new String(message.getBody());
System.out.println(new String(pattern) + "接收消息:" + msg);
//遍历本地内存当中的websocket连接...
//拿到对应的websocket session就可以进行推送消息
}

}

配置下websocket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
public class WebSocketConfig extends ServerEndpointConfig.Configurator{
@Override
public void modifyHandshake(ServerEndpointConfig sec,
HandshakeRequest request, HandshakeResponse response) {
// 主要为了能在websocket打开连接时获取httpsession和当前登陆用户,此处跟本文内容没有关系
HttpSession httpSession=(HttpSession) request.getHttpSession();
//存入httpsession
sec.getUserProperties().put(HttpSession.class.getName(),httpSession);
//存入当前用户
sec.getUserProperties().put("user", ShiroUtils.getCurrentUser());
super.modifyHandshake(sec, request, response);

}

/**
* 自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

配置redis消息发布订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* redis 消息监听 用于websocket 分布式处理
* @param redisConnectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory){
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
//设置订阅topic
redisMessageListenerContainer.addMessageListener(new SubscribeListener(), new ChannelTopic("socket_topic"));
return redisMessageListenerContainer;
}

再写一个简易的存储工具类,这个就基于ConcurrentHashMap就能实现,不记录了。

最后来看websocket ServerEndpoint 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@OnOpen
public void onOpen(Session session, EndpointConfig config) {
log.debug("websocket:打开连接");
//将连接存入我们的缓存工具,这个工具就是简单的存储,代码自己写一个
CacheSessionMap.put("可以是sessionId,也可以实当前用户ID",session);

}

@OnClose
public void onClose(Session session) {
log.debug("websocket:关闭连接");
//关闭的连接,我们将其移除
CacheSessionMap.remove("可以是sessionId,也可以实当前用户ID");

}

@OnMessage
public void onMessage(Session session, String message) throws IOException {
log.debug("websocket:消息来了");
//用我们之前写的redis消息发布器,将这个消息发布到redis
publishService.publish("socket_topic", message);
}

关键代码就已经结束了,赶紧试试吧!

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×