注册

在Android中使用Netty进行通讯,附带服务端代码

Netty

Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。 Netty 是一个广泛使用的 Java 网络编程框架(Netty 在 2011 年获得了Duke's Choice Award,见http://www.java.net/dukeschoice… Facebook 和 Instagram 以及流行 开源项目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其强大的对于网络抽象的核心代码。

依赖引入

由于使用最新版本的话,发现有个类找不到,后面查了下是因为jdk版本,在android中的话,太高的jdk版本肯定不支持,所以我找了19年的发行版,测试ok。

implementation 'io.netty:netty-all:4.1.42.Final'

服务端代码实现

NettyServer

@Slf4j
public class NettyServer {

public void start(InetSocketAddress socketAddress) {
//new 一个主线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//new 一个工作线程组
EventLoopGroup workGroup = new NioEventLoopGroup(200);
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerChannelInitializer())
.localAddress(socketAddress)
//设置队列大小
.option(ChannelOption.SO_BACKLOG, 1024)
// 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
.childOption(ChannelOption.SO_KEEPALIVE, true);
//绑定端口,开始接收进来的连接
try {
ChannelFuture future = bootstrap.bind(socketAddress).sync();
log.info("服务器启动开始监听端口: {}", socketAddress.getPort());
// future.channel().writeAndFlush("你好啊");
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//关闭主线程组
bossGroup.shutdownGracefully();
//关闭工作线程组
workGroup.shutdownGracefully();
}
}
}

ServerChannelInitializer

public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//添加编解码
socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
socketChannel.pipeline().addLast(new NettyServerHandler());
}
}

NettyServerHandler

@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 客户端连接会触发
*/

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("Channel active......");
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("Channel Inactive......");
}

/**
* 客户端发消息会触发
*/

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//----------- 只改了这里 -----------
log.info("服务器收到消息1111: {}", msg.toString());

ctx.write("{\"data\":{\"taskData\":{\"collectionRule\":{\"id\":1,\"name\":\"IP主播直播室互动用户\",\"rule\":\"[{\\\"label\\\":\\\"抖音号\\\",\\\"key\\\":\\\"dyId\\\",\\\"type\\\":\\\"string\\\"},{\\\"key\\\":\\\"count\\\",\\\"label\\\":\\\"数量\\\",\\\"type\\\":\\\"string\\\"}]\",\"ruleType\":\"collect\",\"source\":\"collectLiveAudience\"},\"description\":\"粉丝列表-付鹏的财经世界3\",\"ruleId\":\"1\",\"ruleParam\":\"{\\\"dyId\\\":\\\"ghsys\\\",\\\"count\\\":\\\"140000\\\"}\"},\"taskId\":\"64\",\"taskType\":\"collection\"},\"devicesId\":\"5011bbdcd5006a93\",\"type\":\"task\"}");
ctx.flush();
}

/**
* 发生异常触发
*/

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

启动服务

@SpringBootApplication
public class ServerApplication {

public static void main(String[] args) {
SpringApplication.run(ServerApplication.class, args);
//启动服务端
NettyServer nettyServer = new NettyServer();
nettyServer.start(new InetSocketAddress("192.18.52.95", 8091));
}
}

客户端代码实现

其实netty的使用客户端和服务器端整体上是差不多的,所以这里只列出来核心代码。

初始化操作

abstract class McnNettyTask : Runnable {

private var socketChannel: SocketChannel? = null
private var isConnected = false

override fun run() {
createConnection()
}

private fun createConnection() {
val nioEventLoopGroup = NioEventLoopGroup()
val bootstrap = Bootstrap()
bootstrap
.group(nioEventLoopGroup)
.option(ChannelOption.TCP_NODELAY, true) //无阻塞
.channel(NioSocketChannel::class.java)
.option(ChannelOption.SO_KEEPALIVE, true) //长连接
.option(ChannelOption.SO_TIMEOUT, 30_000) //收发超时
.handler(McnClientInitializer(object : McnClientListener {
override fun disConnected() {
isConnected = false
}

override fun connected() {
isConnected = true
}
}, object : McnEventListener {
override fun onReceiverMessage(messageRequest: MessageRequest) {
dispatchMessage(messageRequest)
}
}))
try {
val channelFuture = bootstrap.connect(McnNettyConfig.ip, McnNettyConfig.port)
.addListener(object : ChannelFutureListener {
override fun operationComplete(future: ChannelFuture) {
if (future.isSuccess) {
socketChannel = future.channel() as SocketChannel;
isConnected = true
CommonConsole.log("netty connect success (ip: ${McnNettyConfig.ip}, port: ${McnNettyConfig.port})")

sendMsg(MessageRequest.createDevicesState(0))
} else {
CommonConsole.log("netty connect failure (ip: ${McnNettyConfig.ip}, port: ${McnNettyConfig.port})")
isConnected = false
future.channel().close()
nioEventLoopGroup.shutdownGracefully()
}
}
}).sync()//阻塞,直到连接完成
channelFuture.channel().closeFuture().sync()
} catch (ex: Exception) {
ex.printStackTrace()
} finally {
//释放所有资源和创建的线程
nioEventLoopGroup.shutdownGracefully()
}
}

fun isConnected(): Boolean {
return isConnected
}

fun disConnected() {
socketChannel?.close()
}

abstract fun dispatchMessage(messageRequest: MessageRequest)

fun sendMsg(msg: String, nettyMessageListener: McnMessageListener? = null) {
if (!isConnected()) {
nettyMessageListener?.sendFailure()
return
}
socketChannel?.run {
writeAndFlush(msg + "###").addListener { future ->
if (future.isSuccess) {
//消息发送成功
CommonConsole.log("netty send message success (message: $msg")
nettyMessageListener?.sendSuccess()
} else {
//消息发送失败
CommonConsole.log("netty send message failure (message: $msg")
nettyMessageListener?.sendFailure()
}
}
}
}
}

加载handler和Initializer

class McnClientInitializer(
private val nettyClientListener: McnClientListener,
private val nettyEventListener: McnEventListener
) :
ChannelInitializer<SocketChannel>() {

override fun initChannel(socketChannel: SocketChannel) {
val pipeline = socketChannel.pipeline()
// pipeline.addLast("decoder", McnStringDecoder())
// pipeline.addLast("encoder", McnStringEncoder())
// pipeline.addLast(LineBasedFrameDecoder(1024))
pipeline.addLast("decoder", StringDecoder())
// pipeline.addLast("encoder", StringEncoder())
pipeline.addLast(DelimiterBasedFrameEncoder("###"))
pipeline.addLast(McnClientHandler(nettyClientListener, nettyEventListener))
}
}

核心数据接收处理handler

class McnClientHandler(
private val nettyClientListener: McnClientListener,
private val nettyEventListener: McnEventListener
) :
SimpleChannelInboundHandler<String>() {

override fun channelActive(ctx: ChannelHandlerContext?) {
super.channelActive(ctx)
CommonConsole.log("Netty channelActive.........")
nettyClientListener.connected()
}

override fun channelInactive(ctx: ChannelHandlerContext?) {
super.channelInactive(ctx)
nettyClientListener.disConnected()
}

override fun channelReadComplete(ctx: ChannelHandlerContext?) {
super.channelReadComplete(ctx)
CommonConsole.log("Netty channelReadComplete.........")
}

override fun exceptionCaught(ctx: ChannelHandlerContext?, cause: Throwable?) {
super.exceptionCaught(ctx, cause)
CommonConsole.log("Netty exceptionCaught.........${cause?.message}")
cause?.printStackTrace()
ctx?.close()
}

override fun channelRead0(ctx: ChannelHandlerContext?, msg: String?) {
CommonConsole.log("Netty channelRead.........${msg}")
msg?.run {
try {
val messageRequest =
Gson().fromJson<MessageRequest>(msg, MessageRequest::class.java)
nettyEventListener.onReceiverMessage(messageRequest)
} catch (ex: Exception) {
ex.printStackTrace()
}
ReferenceCountUtil.release(msg)
}
}
}

处理数据粘包 & 数据分包

如果使用netty,你肯定会碰到数据粘包和数据分包的问题的。所谓数据粘包就是当数据量比较小的情况下,相近时间内的多个发送数据会被作为一个数据包接收解析。而数据分包就是会将一个比较大的数据包分成为很多个小的数据包。不管是数据的分包还是粘包,都会导致我们使用的时候不能简单使用数据,所以我们要对粘包和分包数据做处理,让每次发送的数据都是独立且完整的。

对于数据编码,使用自定义的解析器处理,相当于是对数据使用特定字符串做拼接和反取操作。

服务端:

ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,
Unpooled.wrappedBuffer(delimiter.getBytes())));
// 将分隔之后的字节数据转换为字符串数据
ch.pipeline().addLast(new StringDecoder());
// 这是我们自定义的一个编码器,主要作用是在返回的响应数据最后添加分隔符
ch.pipeline().addLast(new DelimiterBasedFrameEncoder("###"));
// 最终处理数据并且返回响应的handler
ch.pipeline().addLast(new EchoServerHandler());

客户端:

/ 对服务端返回的消息通过_$进行分隔,并且每次查找的最大大小为1024字节
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,
Unpooled.wrappedBuffer(delimiter.getBytes())));
// 将分隔之后的字节数据转换为字符串
ch.pipeline().addLast(new StringDecoder());
// 对客户端发送的数据进行编码,这里主要是在客户端发送的数据最后添加分隔符
ch.pipeline().addLast(new DelimiterBasedFrameEncoder("###"));
// 客户端发送数据给服务端,并且处理从服务端响应的数据
ch.pipeline().addLast(new EchoClientHandler());

DelimiterBasedFrameEncoder

public class DelimiterBasedFrameEncoder extends MessageToByteEncoder<String> {

private String delimiter;

public DelimiterBasedFrameEncoder(String delimiter) {
this.delimiter = delimiter;
}

@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out)
throws Exception {
// 在响应的数据后面添加分隔符
ctx.writeAndFlush(Unpooled.wrappedBuffer((msg + delimiter).getBytes()));
}
}

如果是

对客户端 & 服务端通讯加入[暗号]

在客户端和服务端数据通讯的时候,为了确保数据的完整性和安全性,通常会加入一段暗号,作为安全校验。其实两端真实的通信结构就变成了如下:

完整数据 = 暗号字节 + 真实数据内容

客户端和服务端在获取到数据之后,按照约定将暗号数据移除之后,剩下的就是正式的数据。

对于暗号的处理,可以通过MessageToMessageEncoder来实现,通过对获取到的通讯字节码编解码来对数据处理。

McnStringEncoder

/**
* copy自 StringEncoder源码,进行修改,增加了业务处理暗号
*/

class McnStringEncoder : MessageToMessageEncoder<CharSequence> {

var charset: Charset? = null

constructor(charset: Charset?) {
if (charset == null) {
throw NullPointerException("charset")
} else {
this.charset = charset
}
}

constructor() : this(Charset.defaultCharset())

override fun encode(ctx: ChannelHandlerContext?, msg: CharSequence?, out: MutableList<Any>?) {
if (msg?.isNotEmpty() == true) {
out?.add(
ByteBufUtil.encodeString(
ctx!!.alloc(),
CharBuffer.wrap(McnNettyConfig.private_key + msg),
charset
)
)
}
}
}

McnStringDecoder

/**
* copy自 StringDecoder源码,进行修改,增加了业务处理暗号
*/

class McnStringDecoder : MessageToMessageDecoder<ByteBuf> {
var charset: Charset? = null

constructor(charset: Charset?) {
if (charset == null) {
throw NullPointerException("charset")
} else {
this.charset = charset
}
}

constructor() : this(Charset.defaultCharset())

override fun decode(ctx: ChannelHandlerContext?, msg: ByteBuf?, out: MutableList<Any>?) {
msg?.run {
Log.e("info", "decoder结果====>${msg.toString(charset)}")
//校验报文长度是否合法
if (msg.readableBytes() <= McnNettyConfig.keyLength) {
out?.add(ErrorData.creator(ErrorData.LENGTH_ERROR, "报文长度校验失败"))
return
}
val privateKey = this.readBytes(McnNettyConfig.keyLength)
//校验报文暗号是否匹配
if (privateKey.toString(charset) != McnNettyConfig.private_key) {
out?.add(ErrorData.creator(ErrorData.PRIVATE_KEY_ERROR, "报文暗号校验失败"))
return
}
//获取真实报文内容
out?.add(this.toString(charset))
}
}

data class ErrorData(
var errorCode: Int,
var errorMsg: String
) {

companion object {

//长度异常
const val LENGTH_ERROR = -10001

//报文校验失败
const val PRIVATE_KEY_ERROR = -10002

@JvmStatic
fun creator(errorCode: Int, message: String): String {
val errorData = ErrorData(errorCode, message)
return JSON.toJSONString(errorData)
}
}
}
}

最后的使用就很简单了。只需要将我们的处理加到处理链就行了。

val pipeline = socketChannel.pipeline()
pipeline.addLast("decoder", McnStringDecoder())
pipeline.addLast("encoder", McnStringEncoder())

0 个评论

要回复文章请先登录注册