前言
由于项目需要使用WebSocket做分布式之间的通讯。所以最近研究了一下WebSocket在SpringBoot和Python中的通讯方法。
注意:WebSocket是最朴素的通讯方法,这也导致了直接使用较为繁琐。我们这次使用的是最原始的WebSocket协议,其他的类似于STOMP这种,算是其升级版。当然也有实现好了的类似于RabbitMQ这种消息队列系统,这些都不是我们今天讨论的内容。(太复杂了,我们时间不太够,先自己写一个)
Server端(SpringBoot)
第一步:导入依赖
项目采用Maven构建,在pom.xml中加入如下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
第一步:创建一个配置类
配置类:Spring Boot 的配置类通常是指那些使用了 @Configuration 注解的 Java 类,它们用来定义 Spring 容器中的 bean 对象。这些类允许你在代码中声明 bean,而不是通过 XML 文件来配置。
简单来说,就是使用Java代码的形式对代码进行配置。(类似于application.properties的作用)
配置类WebSocketConfig.java的代码如下:
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new MyTextWebSocketHandler(), "/server-data").setAllowedOrigins("*");
}
}
配置类的一般做法:实现/继承一个配置类(WebSocketConfigurer),然后@Override重写对应的方法。
这里就通过addHandler方法,新增一个监听器,监听器就是Handler的一个class,后面也可以通过继承的方式重写以实现自己的功能。
当然,你也可以配置多个Handler,Handler顾名思义,就是处理一个Websocket请求的类。
第二步,创建一个Handler处理WebSocket事件
SpringBoot的WebSocket默认提供了TextWebSocketHandler和BinaryWebSocketHandler。除此之外,Spring还提供了WebSocketHandler接口,你可以直接实现这个接口来创建自己的WebSocket处理器。这样可以提供更多的灵活性,因为你可以在实现中完全控制WebSocket连接的生命周期。
TextWebSocketHandler:用于收发文本数据
import org.springframework.web.socket.handler.TextWebSocketHandler;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
public class MyTextWebSocketHandler extends TextWebSocketHandler {
// 当WebSocket连接建立时调用
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
super.afterConnectionEstablished(session);
// 可以在这里发送一些初始化数据给客户端
session.sendMessage(new TextMessage("Connection established"));
}
// 当接收到客户端的消息时调用
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String payload = message.getPayload();
// 向客户端回送消息
session.sendMessage(new TextMessage("Received: " + payload));
}
// 当WebSocket连接关闭时调用
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
super.afterConnectionClosed(session, status);
// 可以在这里做一些清理工作
}
// 当出现异常时调用
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
super.handleTransportError(session, exception);
// 处理错误情况
}
}
BinaryWebSocketHandler:用于收发二进制数据(byte[])
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.BinaryWebSocketHandler;
import org.springframework.web.socket.CloseStatus;
public class MyBinaryWebSocketHandler extends BinaryWebSocketHandler {
// 当WebSocket连接建立时调用
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
super.afterConnectionEstablished(session);
// 可以在这里发送一些初始化数据给客户端
}
// 当接收到客户端的二进制消息时调用
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
byte[] payload = message.getPayload().array();
// 向客户端回送消息
session.sendMessage(new BinaryMessage(payload));
}
// 当WebSocket连接关闭时调用
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
super.afterConnectionClosed(session, status);
// 可以在这里做一些清理工作
}
// 当出现异常时调用
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
super.handleTransportError(session, exception);
// 处理错误情况
}
}
具体怎么用,注解已经写的非常清楚了。就是遇到什么事件调用什么事件。
Cilent端(Python)
Python中有两个关于WebSocket的库,一个是websocket-cilient(简称websocket),还有一个是websockets。两个有什么区别呢?前者更简单,但不支持异步(少量数据也用不到),后者更复杂,但是支持异步(多线程操作的感觉)
WebSocket库
使用websocket库的效果:
import websocket
def on_message(ws, message):
print(message)
ws.close()
def on_error(ws, error):
print(error)
def on_close(ws, close_status_code, close_msg):
print("### closed ###")
def on_open(ws):
ws.send("Hello, Server!")
if __name__ == "__main__":
websocket.enableTrace(True)
ws = websocket.WebSocketApp("ws://127.0.0.1:8090/server-data",
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.run_forever(reconnect=1) # 如果断开连接,1s重试一次
只需要把回调函数传入即可,非常方便。
WebSockets库
import asyncio
import websockets
async def hello():
uri = "ws://127.0.0.1:8090/server-data"
async with websockets.connect(uri) as websocket:
await websocket.send("Hello, server!")
greeting = await websocket.recv()
print(f"< {greeting}")
asyncio.run(hello())
具体情况,具体选择吧,我们就只搞一个简单的指令传输,所以也用不到异步,所以我们选简单的那个。








