Marvin's Blog

Hi, My friend. Welcome to my space

0%

WebSocket

WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

在 WebSocket API 中,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道。两者之间就直接可以数据互相传送。

现在,很多网站为了实现推送技术,所用的技术都是 Ajax 轮询。轮询是在特定的的时间间隔(如每1秒),由浏览器对服务器发出HTTP请求,然后由服务器返回最新的数据给客户端的浏览器。这种传统的模式带来很明显的缺点,即浏览器需要不断的向服务器发出请求,然而HTTP请求可能包含较长的头部,其中真正有效的数据可能只是很小的一部分,显然这样会浪费很多的带宽等资源。

POM

1
2
3
4
5
6
7
8
9
10
11
12
13
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<!-- websocket -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>${springboot.version}</version>
</dependency>
</dependencies>

YML

1
2
server:
port: 9119

@Configuration

重写modifyHandshake修改握手规则,用以向HttpSession存储WebSocketSession

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Configuration
@Slf4j
public class WebConfig extends ServerEndpointConfig.Configurator{

/**
* ServerEndpoint容器
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}

/**
* 重写修改WebSocket握手规则,在WebSocket配置中写入HttpSession
*/
@Override
public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {
Map<String, Object> userProperties = config.getUserProperties();
log.info("UserProperties is Null ? {}", (null == userProperties));
HttpSession httpSession = (HttpSession) request.getHttpSession();
log.info("Request [{}] HttpSession is Null ? {}", request.getRequestURI().getPath(), (null == httpSession));
if (null != userProperties && null != httpSession)
userProperties.put(HttpSession.class.getName(), httpSession);
}
}

@ServerEndpoint

服务端代码如下

configurator = WebConfig.class用以指定WebSocket配置类,该类需要继承javax.websocket.server.ServerEndpointConfig.Configurator

value = "/test/oneToOne"用以指定客户端访问路径,该服务端的全路径访问地址格式为ws://+服务端IP+服务端口(server.port: 9119)+/test/oneToOne

AtomicInteger onlineCount用以记录在线用户数量,AtomicInteger可保证并发请求时数量准确。

Map<String, Session> clientsSession.getId()key,存储Session,用以对多个客户端发送消息。

onOpen()客户端连接成功

onClose()客户端关闭连接

onError()客户端连接出错

onMessage()接收到客户端消息,此处为接收String类型消息后,再行转换。下一节将介绍实现对应的接口后,自动转换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@Slf4j
@ServerEndpoint(value = "/test/oneToOne",configurator = WebConfig.class)
@Component
public class OneToOneWebSocket {

private static AtomicInteger onlineCount = new AtomicInteger(0);

private static Map<String, Session> clients = new ConcurrentHashMap();

@OnOpen
public void onOpen(Session session,EndpointConfig config) {
onlineCount.incrementAndGet(); // 在线数加1
HttpSession httpSession = (HttpSession) config.getUserProperties().get(HttpSession.class.getName());
if (null != httpSession){
httpSession.setAttribute("socketSession",session);
}
log.info("{}",JSON.toJSONString(httpSession));
clients.put(session.getId(), session);
log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get());
Set<String> strings = clients.keySet();
this.sendMessage(MessageFormat.format("欢迎用户:{0} 加入连接,现有用户:{1}", session.getId() , strings), session);

Iterator<String> iterator = strings.iterator();
String sessionId;
while (iterator.hasNext()) {
sessionId = iterator.next();
if (!sessionId.equals(session.getId()))
this.sendMessage(MessageFormat.format("用户:{0} 加入连接,现有用户:{1}", session.getId(), strings), clients.get(sessionId));
}
}

@OnClose
public void onClose(Session session) {
onlineCount.decrementAndGet(); // 在线数减1
clients.remove(session.getId());
log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
Set<String> strings = clients.keySet();
Iterator<String> iterator = strings.iterator();
String sessionId;
while (iterator.hasNext()) {
sessionId = iterator.next();
this.sendMessage(MessageFormat.format("用户:{0} 关闭连接,现有用户:{1}", session.getId(), strings), clients.get(sessionId));
}
}

@OnMessage
public void onMessage(String message, Session session) {
log.info("服务端收到客户端[{}]的消息[{}]", session.getId(), message);
WebMessage myMessage = null;
try {
myMessage = JSON.parseObject(message, WebMessage.class);
} catch (Exception e) {
log.error("解析失败:{}", e);
}

if (myMessage != null && myMessage.getMessage() != null) {
if (null == myMessage.getUserId()) {
Set<String> strings = clients.keySet();
Iterator<String> iterator = strings.iterator();
String sessionId;
while (iterator.hasNext()) {
sessionId = iterator.next();
if (!session.getId().equals(sessionId))
this.sendMessage(MessageFormat.format("@所有人:{0},来自:{1}", myMessage.getMessage(), session.getId()), clients.get(sessionId));
}
} else {
Session toSession = clients.get(myMessage.getUserId());
// log.debug(toSession.getId());
if (toSession != null) {
this.sendMessage(MessageFormat.format("{0},来自:{1}", myMessage.getMessage(), session.getId()), toSession);
} else {
this.sendMessage(MessageFormat.format("发送失败!目标用户{0}不存在", myMessage.getUserId()), session);
}
}
}

}

@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}

private void sendMessage(String message, Session toSession) {
try {
log.info("服务端给客户端[{}]发送消息[{}]", toSession.getId(), message);
toSession.getBasicRemote().sendText(message);
} catch (Exception e) {
log.error("服务端发送消息给客户端失败:{}", e);
}
}

前端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
if ("WebSocket" in window) {
console.log("浏览器支持 WebSocket")
}
var ws = new WebSocket("ws://192.168.1.10:9119/websocket/test");
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function() {
ws.close();
}
//连接成功的回调方法
ws.onopen = function(event) {
console.log("websocket connected ...");
}
//连接关闭的回调方法
ws.onclose = function(event) {
console.log("websocket close ...");
}
//接收到消息
ws.onmessage = function(e) {
let msg = JSON.parse(e.data)
console.log(msg);

if (msg.msg && msg.msg === "id") {
$('input[name="from"]').val(msg.to);
}

if (msg.msg && msg.msg.length > 0 && msg.msg !== "id") {
$('#received').text(msg.msg + ' from ' + msg.from);
$('#received_box').css('display', 'block');
}

if (msg.online != null && msg.online.length > 0) {
let selects = [];
for (let i in msg.online) {
selects.push({name:msg.online[i],value:msg.online[i]})
}
xmSelect.render({
el: '#form_select_mulit',
filterable: true,
toolbar: { show: true },
theme: {
color: '#1E9FFF',
},
data: selects
});
form.render()
}
}

自动转换消息类型

  • 建立常规实体类,实现Serializable接口。

  • 建立两个转换类,分别实现javax.websocket.Decoder.Text<Message>javax.websocket.Encoder.Text<Message>接口并重写方法。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    //Encoder
    @Override
    public String encode(Message message) throws EncodeException {
    return JSON.toJSONString(message);
    }
    //Decoder
    @Override
    public Message decode(String message) throws DecodeException {
    return JSON.parseObject(message, Message.class);
    }

    @Override
    public boolean willDecode(String s) {
    return true;
    }
  • 服务端指定编码与解码消息对象类

    1
    @ServerEndpoint(value = "/websocket/test",configurator = WebConfig.class,decoders = MessageDecoder.class,encoders = MessageEncoder.class)
  • 服务端代码 重点:sendMessage(Message message,Session toSession)方法中的接收参数必须与onMessage(Session session,Message message)的参数一致。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    private void sendMessage(Message message,Session toSession){
    if (null == toSession) {
    log.error("session is Null. id [{}]",message.getTo());
    return;
    }
    try {
    toSession.getBasicRemote().sendText(JSON.toJSONString(message));
    log.info("server to [{}] send {} .",toSession.getId(), JSON.toJSONString(message));
    } catch (IOException e) {
    log.error("server to [{}] send {} failure . because : {}",toSession.getId(), JSON.toJSONString(message),e.getLocalizedMessage());
    }
    }

    @OnMessage
    public void onMessage(Session session,Message message){
    Session toSession = clients.get(message.getTo());
    message.setFrom(session.getId());
    sendMessage(message,toSession);
    // return message;
    }

    至止,WebSocket传递普通文本已经粗略完成。