Netty 的编码 解码 案例
0x01:半包粘包
例如发送两个数据包给服务器,由于服务端一次读取到的字节数不一定的分
没有半包和拆包:服务器分两次读取到两个地理的数据包,这个情况没有拆包和粘包的情况
粘包:服务器一次收到两个数据包,在一起收到的
拆包:第一次读取到完成的第一个包和第二个包的一部分内容,第二次读取到第二个包的剩余内容
整包:第一次读取到第一包的部分内容,第二次读取到第一个包的剩余部分和第二个包的全部
多次拆包:如果接收滑窗非常小,数据量大的时候发生多次发送的接收的情况
为什么会出现半包和粘包
1、HTTP 中有一个 Nagle 算法,每个报文都是一段的,使用网络发送发现网络效率低,然后 HTTP 设置一个算法,设置到一定程度发,所以出现一些延时,提高销量,所以形成了粘包
2、HTTP缓冲区引起的,报文段大的时候的时候直接弄在一起发送过去。
怎么解决
不断的从 TCP 的缓冲区中读取数据,每次读取完成都需要判断是否是一个完整的数据包
如果是读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从 TCP 缓冲区中读取,直到得到一个完整的数据包
定长
分隔符
基于长度的变长包
如果当前督导的数据加上已经读取到的数据足以拼接成一个数据包,那就讲已经读取的数据拼接本次读取的数据,构成一个完整的业务数据包传递到业务逻辑上,多余的数据保留,方便下次的读取或者数据链接。
0x02:Netty常用的编码器
LineBasedFrameDecoder
回车换行编码器
配合StringDecoder
DelimiterBasedFrameDecoder
分隔符解码器
FixedLengthFrameDecoder
固定长度解码器
LengthFieldBasedFrameDecoder
不能超过1024个字节不然会报错
基于'长度'解码器(私有协议最常用)
0x03:拆包的类
ByteToMessageDecoder
自解析
LengthFieldPrepender
长度编码器
Netty拆包的基类 - ByteToMessageDecoder
内部维护了一个数据累积器cumulation,每次读取到数据都会不断累加,然后尝试对累加到
的数据进行拆包,拆成一个完整的业务数据包
每次都将读取到的数据通过内存拷贝的方式, 累积到cumulation中
调用子类的 decode 方法对累积的数据尝试进行拆包
LengthFieldBasedFrameDecoder
参数说明
maxFrameLength:包的最大长度
lengthFieldOffset:长度属性的起始位(偏移位),包中存放长度属性字段的起始位置
lengthFieldLength:长度属性的长度
lengthAdjustment:长度调节值,在总长被定义为包含包头长度时,修正信息长度
initialBytesToStrip:跳过的字节数,根据需要跳过lengthFieldLength个字节,以便接收端直接接受到不含“长度属性”的内容
LengthFieldPrepender 编码器
参数说明
lengthFieldLength:长度属性的字节长度
lengthIncludesLengthFieldLength:false,长度字节不算在总长度中,true,算到总长度中
编解码器的作用就是讲原始字节数据与自定义的消息对象进行互转
Decoder(解码器)
Encoder(编码器)
支持业界主流的序列化框架
Protobuf
Jboss Marshalling
Java Serialization
解码1拆包:把整个 ByteBuf 数据,分成一个个 ByteBuf,每个表示一个包
解码2反序列化:把每个包的 ByteBuf 字节数组转成 java object
package com.demo;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
public class StickyDemoClient {
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
}
}
new StickyDemoClient().connect(port, "127.0.0.1");
}
public void connect(int port, String host) throws Exception {
// 工作线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//ch.pipeline().addLast("framer", new FixedLengthFrameDecoder(139));
// ch.pipeline().addLast("framer", new StickyDemoDecodeHandlerV2(
// Unpooled.wrappedBuffer(new byte[] { '#' })));
ch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(8192,
Unpooled.wrappedBuffer(new byte[] { '#' })));
ch.pipeline().addLast(new StickyDemoClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
// 等待客户端链路关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放线程池资源
group.shutdownGracefully();
}
}
}
package com.demo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class StickyDemoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static String[] alphabets = {"A", "B", "C", "D", "E", "F", "G", "H", "I",
"J", "K", "L", "M", "N", "O", "P"};
@Override
public void channelActive(ChannelHandlerContext ctx) {
for(int i=0; i<10; i++) {
StringBuilder builder = new StringBuilder();
builder.append("这是第");
builder.append(i);
builder.append("条消息, 内容是:");
for(int j=0; j<100; j++) {
builder.append(alphabets[i]);
}
builder.append("......");
builder.append("#");
System.out.println(builder.toString().getBytes().length);
ctx.writeAndFlush(Unpooled.copiedBuffer(builder.toString(),
CharsetUtil.UTF_8));
}
}
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
System.out.println("客户端接收到消息:" + in.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
package com.demo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.ArrayList;
import java.util.List;
public class StickyDemoDecodeHandler extends ChannelInboundHandlerAdapter {
//存放待拆包数据的缓冲区
private ByteBuf cache;
private int frameLength;
public StickyDemoDecodeHandler(int length) {
this.frameLength = length;
}
static ByteBuf expandCache(ByteBufAllocator alloc, ByteBuf cache, int readable) {
ByteBuf oldCache = cache;
cache = alloc.buffer(oldCache.readableBytes() + readable);
cache.writeBytes(oldCache);
oldCache.release();
return cache;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf data = (ByteBuf) msg;
try {
//读取每一个消息,创建缓冲区
if (cache == null) {
cache = ctx.alloc().buffer(1024);
} else {
//如果现有的缓冲区容量太小,无法容纳原有数据+新读入的数据,就扩容(重新创建一个大的,并把数据拷贝过去)
if (cache.writerIndex() > cache.maxCapacity() - data.readableBytes()) {
cache = expandCache(ctx.alloc(), cache, data.readableBytes());
}
}
//把新的数据读入缓冲区
cache.writeBytes(data);
//每次读取frameLength(定长)的数据,做为一个包,存储起来
List<ByteBuf> output = new ArrayList<>();
while (cache.readableBytes() >= frameLength) {
output.add(cache.readBytes(frameLength));
}
//还有部分数据不够一个包,10, 15, 一个10个,还剩5个
if (cache.isReadable()) {
cache.discardReadBytes();
}
for (int i = 0; i < output.size(); i++) {
ctx.fireChannelRead(output.get(i));
}
} finally {
data.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
package com.demo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.ArrayList;
import java.util.List;
public class StickyDemoDecodeHandlerV2 extends ChannelInboundHandlerAdapter {
private ByteBuf cache;
private byte delimiter; //包分隔符
public StickyDemoDecodeHandlerV2(ByteBuf delimiter) {
if (delimiter == null) {
throw new NullPointerException("delimiter");
}
if (!delimiter.isReadable()) {
throw new IllegalArgumentException("empty delimiter");
}
this.delimiter = delimiter.readByte();
;
}
static ByteBuf expandCache(ByteBufAllocator alloc, ByteBuf cache, int readable) {
ByteBuf oldCache = cache;
cache = alloc.buffer(oldCache.readableBytes() + readable);
cache.writeBytes(oldCache);
oldCache.release();
return cache;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf data = (ByteBuf) msg;
try {
if (cache == null) {
cache = ctx.alloc().buffer(1024);
} else {
if (cache.writerIndex() > cache.maxCapacity() - data.readableBytes()) {
cache = expandCache(ctx.alloc(), cache, data.readableBytes());
}
}
cache.writeBytes(data);
List<ByteBuf> output = new ArrayList<>();
int frameIndex = 0;
int frameEndIndex = 0;
int length = cache.readableBytes();
while (frameIndex <= length) {
frameEndIndex = cache.indexOf(frameIndex + 1, length, delimiter);
if (frameEndIndex == -1) {
cache.discardReadBytes();
break;
}
output.add(cache.readBytes(frameEndIndex - frameIndex));
cache.skipBytes(1);
frameIndex = frameEndIndex + 1;
}
if (cache.isReadable()) {
cache.discardReadBytes();
}
for (int i = 0; i < output.size(); i++) {
ctx.fireChannelRead(output.get(i));
}
} finally {
data.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
package com.demo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
public class StickyDemoServer {
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new StickyDemoServer().bind(port);
}
public void bind(int port) throws Exception {
// 第一步:
// 配置服务端的NIO线程组
// 主线程组, 用于接受客户端的连接,但是不做任何具体业务处理,像老板一样,负责接待客户,不具体服务客户
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 工作线程组, 老板线程组会把任务丢给他,让手下线程组去做任务,服务客户
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 类ServerBootstrap用于配置Server相关参数,并启动Server
ServerBootstrap b = new ServerBootstrap();
// 链式调用
// 配置parentGroup和childGroup
b.group(bossGroup, workerGroup)
// 配置Server通道
.channel(NioServerSocketChannel.class)
// 配置通道的ChannelPipeline
.childHandler(new ChildChannelHandler());
// 绑定端口,并启动server,同时设置启动方式为同步
ChannelFuture f = b.bind(port).sync();
System.out.println(StickyDemoServer.class.getName() + " 启动成功,在地址[" + f.channel().localAddress() + "]上等待客户请求......");
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//ch.pipeline().addLast("framer", new FixedLengthFrameDecoder(139));
ch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(8192, Unpooled.wrappedBuffer(new byte[] { '#' })));
//ch.pipeline().addLast("framer", new StickyDemoDecodeHandler(139));
// ch.pipeline().addLast("framer", new StickyDemoDecodeHandlerV2(
// Unpooled.wrappedBuffer(new byte[] { '#' })));
ch.pipeline().addLast(new StickyDemoServerHandler());
}
}
}
package com.demo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class StickyDemoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
System.out.println(
"服务器接收到消息:" + in.toString(CharsetUtil.UTF_8));
ctx.write(in);
// ctx.write(Unpooled.copiedBuffer("#", CharsetUtil.UTF_8));
//compositeBuffer.addComponent(in);
// ByteBuf buf = ctx.alloc().directBuffer();
// buf.writeBytes("#".getBytes());
// CompositeByteBuf compositeBuffer = ctx.alloc().compositeBuffer();
// compositeBuffer.addComponents(true, in, buf);
// ctx.write(compositeBuffer);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
source:https://www.yuque.com/yangxinlei/lodfss/nguvm0
喜欢,在看