WebSocket

WebSocket是HTML5的一种新的协议,它实现了浏览器与服务器的全双工通信的推送技术

全双工和半双工

  • 全双工:指的是可以在同一时间段同时进行信号的双向传输,即A->B的同时B->A
  • 半双工:指的是在同一时间段只能有一个动作进行传输,即A->B结束后才可以B->A

项目地址

SpringBoot使用

引入

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

创建拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package jtxyh.top.chat.sys.interceptor;

import com.alibaba.fastjson.JSON;
import jtxyh.top.chat.sys.enetity.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;

@Component
public class ChatInterceptor implements HandshakeInterceptor {

@Autowired
private StringRedisTemplate rs;

private String key = "INLINE_USER";

// 前置拦截
@Override
public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
// 从redis中获取到用户信息
HashOperations<String, Object, Object> ops = rs.opsForHash();
Object loginUser = ops.get(key, "loginUser");
// 有用户信息代表登陆了
if (loginUser != null) {
User user = JSON.parseObject((String) loginUser, User.class);
map.put("loginUser", user);
// 把在线用户存入redis
ops.put(key, user.getUsername(), JSON.toJSONString(user));
}
// 返回true可以放行
return true;
}

// 后置拦截
@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {

}
}

创建处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package jtxyh.top.chat.sys.controller;

import com.alibaba.fastjson.JSON;
import jtxyh.top.chat.sys.enetity.Message;
import jtxyh.top.chat.sys.enetity.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;

import java.io.IOException;
import java.util.*;

@Component
public class ChatWebSocketHandler implements WebSocketHandler {

@Autowired
private StringRedisTemplate rs;

private String key = "INLINE_USER";

// 存储所有的在线用户
private static final Map<String, WebSocketSession> IN_LINE_USER;


static {
IN_LINE_USER = new HashMap<>();
}

/**
* 连接建立之后的动作
*
* @param webSocketSession 当前用户的信息会存储在这里面
*/
@Override
public void afterConnectionEstablished(WebSocketSession webSocketSession) throws Exception {
// 存储当前登陆用户到在线用户中
User user = (User) webSocketSession.getAttributes().get("loginUser");
IN_LINE_USER.put(user.getUsername(), webSocketSession);
noticeMessage("欢迎【" + user.getUsername() + "】来到聊天室!");
}

/**
* 客户端给服务器发送消息时的处理函数
*
* @param webSocketSession 用户信息
* @param webSocketMessage 消息信息
*/
@Override
public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage) throws Exception {
// 没有收到消息直接返回
if (webSocketMessage.getPayloadLength() == 0) return;
// 从消息中获取到数据,转换为自己的message对象
Message msg = JSON.parseObject(webSocketMessage.getPayload().toString(), Message.class);
// 从webSocketSession中获取到发送消息的用户
User user = (User) webSocketSession.getAttributes().get("loginUser");
msg.setUsername(user.getUsername());
// 把消息发送到前端
sendMessageToAll(new TextMessage(JSON.toJSONBytes(msg)));
}

/**
* 出现异常进行操作
*
* @param webSocketSession
* @param throwable
*/
@Override
public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) throws Exception {
}

/**
* webSocket关闭的回调函数,就是当用户退出之后进行的操作
*
* @param webSocketSession 用户信息
* @param closeStatus 退出的状态码
*/
@Override
public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) throws Exception {
User user = (User) webSocketSession.getAttributes().get("loginUser");
// 移除用户
IN_LINE_USER.remove(user.getUsername());
// 移除redis中的在线用户名
rs.opsForHash().delete(key, user.getUsername());
noticeMessage("【" + user.getUsername() + "】离开了聊天室!");
}

/**
* 这个类是指是否要把消息分段发送,返回true就是把消息拆分
*/
@Override
public boolean supportsPartialMessages() {
return false;
}

/**
* 发送消息给所有的在线用户
*/
private void sendMessageToAll(TextMessage message) {
// 循环遍历发送消息
IN_LINE_USER.forEach((s, webSocketSession) -> {
// 判断用户是否还在线
if (webSocketSession.isOpen()) {
// 使用多线程方式发送消息
new Thread(() -> {
try {
webSocketSession.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
});
}

/**
* 用户登陆或者退出的时候给客户端发送消息
*/
private void noticeMessage(String message) {
// 获取当前登陆用户的信息
Message msg = new Message();
msg.setMsg(message);
msg.setLoginUser(new ArrayList<>());
// 存储所有在线用户
IN_LINE_USER.forEach((s, webSocketSession) -> {
msg.getLoginUser().add((User) webSocketSession.getAttributes().get("loginUser"));
});
msg.setUsername("系统消息");
// 向所有用户发送消息
sendMessageToAll(new TextMessage(JSON.toJSONBytes(msg)));
}
}

创建配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package jtxyh.top.chat.sys.ws;

import jtxyh.top.chat.sys.controller.ChatWebSocketHandler;
import jtxyh.top.chat.sys.interceptor.ChatInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;


@Component
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

// 添加自己定义的请求的处理器
@Autowired
private ChatWebSocketHandler chatWebSocketHandler;

// 添加自己定义的过滤器
@Autowired
private ChatInterceptor chatInterceptor;

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 添加处理器处理路径,处理器必须是实现WebSocketHandler接口的
// 添加过滤器,过滤器必须是实现HandshakeInterceptor接口的
// setAllowedOrigins("*")设置可以跨域
registry.addHandler(chatWebSocketHandler, "/ws").addInterceptors(chatInterceptor).setAllowedOrigins("*");
}
}

前端JS代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
var webSocket;

// 连接websocket
connectWebSocket();

// 关闭socket
$("#close").on("click", () => {
webSocket.close();
});

// 连接websocket
function connectWebSocket() {
webSocket = new WebSocket("ws://localhost:8080/ws");
// 打开连接后执行的逻辑
webSocket.onopen = function (event) {
};
// 监听服务器发送过来的消息
webSocket.onmessage = function (event) {
};
// 监听关闭
webSocket.onclose = function (event) {
}
}

// 发送消息
function sendMessage() {
let message = $("#message");
if (message.val() === "") return;
let msg = {"msg": message.val(), "username": localStorage.getItem("username")};
// 使用send发送消息
webSocket.send(JSON.stringify(msg));
message.val("");
}

相关文章

数据库连接池

Junit和Spring

Tomcat

Servlet

Request,Response和ServletContext

Cookie和Session

JSP和EL和Jstl

Filter和Listener

Mybatis

SpringCache