使用Netty作为TCP的粘包分包处理
平常用tcp作为消息传输的载体时,需要做消息的粘包和拆包的处理,个人建议使用SCTP协议更好(SCTP有类似UDP以消息为单位对外发送、类似TCP有消息不丢失的优点,最适合传输消息类数据)。
先看server端:
- 建立一个server:
public void start(){
// 用来接收进来的连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 用来处理已经被接收的连接,一旦bossGroup接收到连接,就会把连接信息注册到workerGroup上
EventLoopGroup workGroup = new NioEventLoopGroup();
// nio服务的启动类
ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class) // 说明一个新的Channel如何接收进来的连接
.option(ChannelOption.SO_BACKLOG, 128) // tcp最大缓存链接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //保持连接
.handler(new LoggingHandler(LogLevel.INFO)) // 打印日志级别
.childHandler(new TcpServerChannelInitializer());
try {
ChannelFuture future = server.bind(port).sync(); // 绑定端口,开始接受链接
future.channel().closeFuture().sync();// 等待服务端口的关闭;在这个例子中不会发生,但你可以优雅实现;关闭你的服务
} catch (InterruptedException e) {
Log.e(TAG,”server start fail”,e);
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
- 再继承建立一个通道初始化类:
public class TcpServerChannelInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
/*
为了解决网络数据流的拆包粘包问题,Netty 为我们内置了如下的解码器:
ByteToMessageDecoder:如果想实现自己的半包解码器,实现该类;
MessageToMessageDecoder:一般作为二次解码器,当我们在 ByteToMessageDecoder 将一个 bytes 数组转换成一个 java 对象的时候,我们可能还需要将这个对象进行二次解码成其他对象,我们就可以继承这个类;
LineBasedFrameDecoder:通过在包尾添加回车换行符 \r\n 来区分整包消息;
StringDecoder:字符串解码器;
DelimiterBasedFrameDecoder:特殊字符作为分隔符来区分整包消息;
FixedLengthFrameDecoder:报文大小固定长度,不够空格补全;
ProtoBufVarint32FrameDecoder:通过 Protobuf 解码器来区分整包消息;
ProtobufDecoder: Protobuf 解码器;
LengthFieldBasedFrameDecoder:指定长度来标识整包消息,通过在包头指定整包长度来约定包长。
Netty 还内置了如下的编码器:
ProtobufEncoder:Protobuf 编码器;
MessageToByteEncoder:将 Java 对象编码成 ByteBuf;
MessageToMessageEncoder:如果不想将 Java 对象编码成 ByteBuf,而是自定义类就继承这个;
LengthFieldPrepender:LengthFieldPrepender 是一个非常实用的工具类,如果我们在发送消息的时候采用的是:消息长度字段+原始消息的形式,那么我们就可以使用 LengthFieldPrepender。这是因为 LengthFieldPrepender 可以将待发送消息的长度(二进制字节长度)写到 ByteBuf 的前两个字节。
*/
// 这里相当于过滤器,可以配置多个
//处理粘包,要和客户器端发送消息对应,消息格式应该为:4字节消息内容长度+消息内容
pipeline.addLast(new LengthFieldBasedFrameDecoder(
1024, // 帧的最大长度,即每个数据包最大限度
0, // 长度字段偏移量
4, // 长度字段所占的字节数
0, // 长度的修正值,如果长度字段没有包含尾部所有长度,则这里为正,如果长度字段包含了长度自身和长度偏移量,则这里为负
4) // 需要忽略的字节数,从消息帧的第一字节开始算
);
// 自己的逻辑Handler
pipeline.addLast(“handler”, new TcpServerHandler());
}
}
- 最后继承实现一个通道处理类:
public class TcpServerHandler extends SimpleChannelInboundHandler {
private static final String TAG = “TcpServerHandler”;
private int counter;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Log.i(TAG, “server channelActive”);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收消息的处理
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, StandardCharsets.UTF_8);
System.out.println(“—–start——\n”+ body + “\n——end——“);
//发送消息到客户端
String content = “receive” + ++counter;
ByteBuf resp = Unpooled.buffer();
resp.writeShort(content.getBytes(StandardCharsets.UTF_8).length);
resp.writeBytes(content.getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ctx.close();
}
}
再看Client端
其实client端和server的处理类似。
- 先连接到server端:
public void start(){
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new TcpClientChannelInitializer());
try {
ChannelFuture future = bootstrap.connect(address,port).sync();// 客户端开启
future.channel().writeAndFlush(“Hello world, i’m online”);
future.channel().closeFuture().sync();// 等待直到连接中断
} catch (Exception e) {
Log.e(TAG, “client start fail”,e);
}finally {
group.shutdownGracefully();
}
}
- 然后建立通道初始化类:
public class TcpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
/*
为了解决网络数据流的拆包粘包问题,Netty 为我们内置了如下的解码器:
ByteToMessageDecoder:如果想实现自己的半包解码器,实现该类;
MessageToMessageDecoder:一般作为二次解码器,当我们在 ByteToMessageDecoder 将一个 bytes 数组转换成一个 java 对象的时候,我们可能还需要将这个对象进行二次解码成其他对象,我们就可以继承这个类;
LineBasedFrameDecoder:通过在包尾添加回车换行符 \r\n 来区分整包消息;
StringDecoder:字符串解码器;
DelimiterBasedFrameDecoder:特殊字符作为分隔符来区分整包消息;
FixedLengthFrameDecoder:报文大小固定长度,不够空格补全;
ProtoBufVarint32FrameDecoder:通过 Protobuf 解码器来区分整包消息;
ProtobufDecoder: Protobuf 解码器;
LengthFieldBasedFrameDecoder:指定长度来标识整包消息,通过在包头指定整包长度来约定包长。
Netty 还内置了如下的编码器:
ProtobufEncoder:Protobuf 编码器;
MessageToByteEncoder:将 Java 对象编码成 ByteBuf;
MessageToMessageEncoder:如果不想将 Java 对象编码成 ByteBuf,而是自定义类就继承这个;
LengthFieldPrepender:LengthFieldPrepender 是一个非常实用的工具类,如果我们在发送消息的时候采用的是:消息长度字段+原始消息的形式,那么我们就可以使用 LengthFieldPrepender。这是因为 LengthFieldPrepender 可以将待发送消息的长度(二进制字节长度)写到 ByteBuf 的前两个字节。
*/
//处理粘包,要和服务器端发送消息对应,消息格式应该为:2字节消息内容长度+消息内容
pipeline.addLast(new LengthFieldBasedFrameDecoder(
1024, // 帧的最大长度,即每个数据包最大限度
0, // 长度字段偏移量
2, // 长度字段所占的字节数
0, // 长度的修正值,如果长度字段没有包含尾部所有长度,则这里为正,如果长度字段包含了长度自身和长度偏移量,则这里为负
2) // 需要忽略的字节数,从消息帧的第一字节开始算
);
// 客户端的逻辑
pipeline.addLast(“handler”, new TcpClientHandler());
}
}
- 最后实现自己通道的处理:
public class TcpClientHandler extends SimpleChannelInboundHandler {
private static final String TAG = “TcpClientHandler”;
private int counter;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收消息的处理
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, StandardCharsets.UTF_8);
//System.out.println(body + ” count:” + ++counter + “—-end—-\n”);
Log.w(TAG, body + ” count:” + ++counter + “—-end—-\n”);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Log.i(TAG, “client channelActive”);
//发送消息到服务端
for (int i = 0; i < 100; i++) {
byte[] req = (“我是一条测试消息,快来读我吧,啦啦啦” + i).getBytes();
ByteBuf message = Unpooled.buffer(req.length);
message.writeInt(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);// 发送客户端的请求
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Log.i(TAG, “Client is close”);
}
}