注册

Android即时通讯系列文章番外篇(1)使用Netty框架快速搭设WebSocket服务器

前言

随着本系列所讨论技术点的逐步深入,仅靠之前提到的官方测试服务器已经不能满足我们演示的需要了,于是我们有必要尝试在本地搭建自己的WebSocket服务器,今天这篇文章就是介绍这方面的内容。

由于不属于原先的写作计划之内,同时也为了保持系列文章的连贯性,因此特意将本篇文章命名为「番外篇」。

Netty简单介绍

还记得前面的文章「 Android即时通讯系列文章(2)网络通信协议选型:应以什么样的标准去选择适合你应用的网络通信协议?」里我们所提到的吗?WebSocket本身只是一个应用层协议,原则上只要遵循这个协议的客户端/服务端均可使用。对于客户端,前面我们已明确采用OkHttp框架来实现了,而对于服务端,我们则计划采用Netty框架来实现。

Netty是什么?Netty是一款异步的、基于事件驱动的网络应用程序框架,支持快速开发可维护的、高性能的、面向协议的服务端和客户端。

Netty封装了Java NIO API的能力,把原本在高负载下繁琐且容易出错的I/O操作,隐藏在一个简单易用的API之下。这无疑对于缺少服务端编程经验的客户端开发人员是非常友好的,只要把Netty的几个核心组件弄明白了,快速搭设一个满足本项目演示需要的WebSocket服务器基本上没什么问题。

Netty核心组件

Channel

Channel是Netty传输API的核心,被用于所有的I/O操作,Channel 接口所提供的API大大降低了Java中直接使用Socket类的复杂性。

回调

Netty在内部使用了回调来处理事件,当一个回调被触发时,相关的事件可以交由一个ChannelHandler的实现处理。

Future

Future提供了一种在操作完成时通知应用程序的方式,可以看作是一个异步操作的结果的占位符,它将在未来的某个时刻完成,并提供对其结果的访问。

Netty提供了自己的实现——ChannelFuture,由ChannelFutureListener提供的通知机制消除了手动检查对应操作是否完成的步骤。

事件和ChannelHandler

Netty使用不同的事件来通知我们状态的改变,这使得我们能够基于已经发生的事件来触发适当的动作。

每个事件都可以被分发给ChannelHandler类,ChannelHandler类中提供了自定义的业务逻辑,架构上有助于保持业务逻辑与网络处理代码的分离。

用IntelliJ IDEA运行Netty的WebSocket演示代码

众所周知,Android Studio是基于IntelliJ IDEA开发的,因此对于习惯了用Android Studio进行开发的Android开发人员,用起IntelliJ IDEA来也几乎没有任何障碍。本篇的目的是快速搭设WebSocket服务器,因此选择直接将Netty的WebSocket演示代码拉取下来运行。在确保项目能成功运行起来的基础上,再逐步去分析演示代码。

该演示代码展示的交互效果很简单,跟前面的官方测试服务器一样,当客户端向服务端发送一个消息,服务器都会将消息原原本本地回传给客户端(没错,又是Echo Test。。。)。虽然看起来好像用处不大,但它充分地体现了客户端/服务器系统中典型的请求-响应交互模式。

接下来我们分别进行两端的工作:

服务端的工作:

  • IntelliJ IDEA左上角New-Project-Maven创建新工程
  • 拉取Netty的WebSocket演示代码到src目录下
  • 按Alt+Enter快捷键自动导入Netty依赖
  • 运行WebSocketServer类的main()函数

当控制台输出输出语句,即表示WebSocket服务器成功运行在本机上了:

Open your web browser and navigate to http://127.0.0.1:8080/

客户端的工作:

  • 保证手机网络与服务端在同一局域网下
  • 将要连接的WebSocket服务器地址更改为:ws://{服务端IP地址}:8080/websocket
  • 正常发送消息

从控制台可以看到,客户端成功地与WebSocket服务器建立了连接,并在发送消息后成功收到了服务器的回传消息:

11.png

WebSocket演示代码分析

总的来说,Netty的WebSocket演示代码中包含了两部分核心工作,其分别的意义以及对应的类如下表所示:

核心工作意义对应的类
提供ChannelHandler接口实现服务器对从客户端接收的数据的业务逻辑处理WebSocketServerHandler
ServerBootstrap实例创建配置服务器的启动,将服务器绑定到它要监听连接请求的端口上WebSocketServer

我们先来看看WebSocketServerHandler类核心工作的主要代码:

public class WebSocketServerHandler extends SimpleChannelInboundHandler {

private WebSocketServerHandshaker handshaker;

// ...省去其他代码

/**
* 当有新的消息传入时都会回调
*
* @param ctx
* @param msg
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
handleWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}

private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
// ...省去其他代码

// 握手
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(req), null, true, 5 * 1024 * 1024);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
}
}

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// ...省去其他代码

// 对于文本帧和二进制数据帧,将数据简单地回送给了远程节点。
if (frame instanceof TextWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
return;
}
if (frame instanceof BinaryWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
}
}

// ...省去其他代码

}

如你所见,为了处理所有接收到的数据,我们重写了WebSocketServerHandler类的channelRead()方法,重写的方法中主要处理了Http请求和WebSocket帧两种类型的数据。

Http请求类型的数据主要是为了处理客户端的握手建立连接过程,详情可参考前面的文章「 Android即时通讯系列文章(2)网络通信协议选型:应以什么样的标准去选择适合你应用的网络通信协议?」,这里就不再展开讲了。

而WebSocket帧类型的数据主要是为了处理来自客户端主动发送的消息,我们知道,当WebSocket连接建立之后,后续的数据都是以帧的形式发送。主要包含以下几种类型的帧:

  • 文本帧
  • 二进制帧
  • Ping帧
  • Pong帧
  • 关闭帧

其中,文本帧与二进制帧同属于消息帧,Ping帧和Ping帧主要用于连接保活,关闭帧则用于关闭连接,我们这里主要关心对消息帧的处理,可以看到,我们只是将数据简单回传回了远端节点,从而实现Echo Test。

然后,我们再回过头来看WebSocketServer类的核心工作的主要代码:

ublic final class WebSocketServer {

// ...省去其他代码
static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080"));

public static void main(String[] args) throws Exception {
// ...省去其他代码

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 指定所使用的NIO传输Channel
.childHandler(new WebSocketServerInitializer(sslCtx));

// 使用指定的端口,异步地绑定服务器;调用sync()方法阻塞等待直到绑定完成
Channel ch = b.bind(PORT).sync().channel();

System.out.println("Open your web browser and navigate to " +
(SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');

// 获取Channel的CloseFuture,并且阻塞当前线程直到它完成
ch.closeFuture().sync();
} finally {
// 关闭EventLoopGroup,释放所有的资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

我们使用ServerBootstrap引导类来完成Websocket服务器的网络层配置,随后调用bind(int inetPort)方法将进程绑定到某个指定的端口,此过程称之为引导服务器。

我们是如何将前面定义的WebSocketServerHandler与ServerBootstrap关联起来的呢?关键就在于childHandler(ChannelHandler childHandler)方法。

每个Channel都拥有一个与之相关联的ChannelPipeline,其持有一个ChannelHandler的实例链。我们需要提供一个ChannelInitializer的实现,并在其initChannel()回调方法中,将包括WebSocketServerHandler在内的一组自定义的ChannelHandler安装到ChannelPipeline中:

public class WebSocketServerInitializer extends ChannelInitializer {

// ...省去其他代码

public WebSocketServerInitializer(SslContext sslCtx) {
// ...省去其他代码
}

@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerHandler());
}
}

将Echo形式改为Broadcast形式

我们之前讲过,现今主流的IM应用几乎都是采用服务器中转的方式来进行消息传输的,为了更好地实践这种设计,我们进一步来对WebSocket服务器进行改造,把Echo形式改为Broadcast形式,即:

当接收到某一客户端的一条消息之后,将该消息转发给服务端维护的、除发送方之外的其他客户端连接。

要实现这一功能我们需要用到ChannelGroup类,ChannelGroup负责跟踪所有活跃中的WebSocket连接,当有新的客户端通过握手成功建立连接后,我们就要把这个新的Channel添加到ChannelGroup中去。

当接收到了WebSocket消息帧数据后,就调用ChannelGroup的writeAndFlush()方法将消息传输给所有已经连接的WebSocket Channel。

ChannelGroup还允许传递过滤参数,我们可以以此过滤掉发送方的Channel。

public class WebSocketServerHandler extends SimpleChannelInboundHandler {

// ...省去其他代码
private final ChannelGroup group;

public WebSocketServerHandler(ChannelGroup group) {
this.group = group;
}

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// ...省去其他代码
if (frame instanceof TextWebSocketFrame) {
// ctx.write(frame.retain());
group.writeAndFlush(frame.retain(), ChannelMatchers.isNot(ctx.channel()));
return;
}
if (frame instanceof BinaryWebSocketFrame) {
// ctx.write(frame.retain());
group.writeAndFlush(frame.retain(), ChannelMatchers.isNot(ctx.channel()));
}
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
// 将新的WebSocket Channel添加到ChannelGroup 中,以便它可以接收到所有的消息
group.add(ctx.channel());
} else {
super.userEventTriggered(ctx, evt);
}
}
}


运行起来之后,让多个客户端连接到此服务器,当客户端中的一个发送了一条消息后,其他连接的客户端会收到由服务器广播的这一条消息:

12.png 13.png

相关源码已上传到Github

总结

为了满足更多场景的演示需要,我们使用了Netty框架来快速搭建本机的WebSocket服务器。

我们基于Netty的WebSocket演示代码进行改造,核心工作包括以下两部分:

  • 配置服务器的启动,将服务器绑定到它要监听连接请求的端口上
  • 服务器对从客户端接收的数据的业务逻辑处理

我们先是以简单的Echo形式实现了客户端/服务器系统中典型的请求/响应交互模式,并进一步改用广播形式,实现了多个用户之间的相互通信。

0 个评论

要回复文章请先登录注册