前言
特性
引入依赖
xml
<!-- Netty 依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<!-- Properties 读取 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
配置启动服务
java
@Log4j2
public class NettyServer {
// Netty 服务启动端口
private static final Integer port = 18080;
// 事件轮询线程组,处理客户端连接
private EventLoopGroup boss;
// 事件轮询线程组,用于处理数据读写
private EventLoopGroup workers;
/**
* 指定配置并启动 Netty 服务端
*/
public void init() throws InterruptedException {
// 声明服务启动引导
ServerBootstrap bootstrap = new ServerBootstrap();
// 实例化线程组,可以通过构造函数指定线程数量
boss = new NioEventLoopGroup();
workers = new NioEventLoopGroup(4);
try {
// 设置线程组
bootstrap.group(boss, workers)
// 设置 Netty 通道为 NIO 类型
.channel(NioServerSocketChannel.class)
// 设置连接缓冲池大小
.option(ChannelOption.SO_BACKLOG, 1024)
// 开启重用端口
.option(ChannelOption.SO_REUSEADDR, true)
// 关闭延迟发送
.hildOption(ChannelOption.TCP_NODELAY, true)
// 清除死链接,保持长连接状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 设置发送缓冲区大小
.childOption(ChannelOption.SO_SNDBUF, 256 * 1024)
// 设置接收缓冲区大小
.childOption(ChannelOption.SO_RCVBUF, 256 * 1024)
// 设置超时时间
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 6000)
// 设置内存池
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// 设置建立连接的执行器()
.childHandler(new ServerInitializer());
// 绑定端口并阻塞监听,等待客户端连接
ChannelFuture future = bootstrap.bind(port).sync();
log.info("The server with port [{}] has been started successfully !", port);
// 对关闭通道进行阻塞监听
future.channel().closeFuture().sync();
} finally {
// 关闭资源
boss.shutdownGracefully();
workers.shutdownGracefully();
}
}
}
配置属性
可以将配置属性抽离 , 便于配置修改
定义配置实体
java
@Data
@ConfigurationProperties(prefix = "nio.config")
public class ServerProperties {
/**
* 服务端口,默认: 18080
*/
private Integer port = 18080;
/**
* 队列缓冲池,默认: 1024
*/
private Integer backlog = 1024;
/**
* 发送缓冲区大小,默认: 1024 * 1024
*/
private Integer sendBufferSize = 1024;
/**
* 接收缓冲区大小,默认: 1024 * 1024
*/
private Integer receiveBufferSize = 1024;
/**
* 保活机制,默认: true
*/
private Boolean keepalive = true;
/**
* 关闭延迟发送,默认: true
*/
private Boolean noDelay = true;
/**
* 复用本地地址,默认: true
*/
private Boolean reuseAddr = true;
/**
* 连接超时时间,默认: 5000
*/
private Integer connectTimeout = 5000;
}
配置修改
可通过 application.yml 文件进行属性配置
yaml
# 自定义属性配置
nio:
config:
port: 18081
backlog: 1024
send-buffer-size: 1024
receive-buffer-size: 1024
keepalive: true
no-delay: true
reuse-addr: true
connect-timeout: 5000
配置引用
将上面 启动服务 的配置项替换为 Properties
读取
java
@Component
@EnableConfigurationProperties(ServerProperties.class)
public class NettyServer {
private final ServerProperties properties;
// 注入 ServerProperties
public NettyServer(ServerProperties properties) {
this.properties = properties;
}
public void init() throws InterruptedException {
...
// 设置线程组
bootstrap.group(boss, workers)
// 设置 Netty 通道为 NIO 类型
.channel(NioServerSocketChannel.class)
// 替换为配置文件读取
.option(ChannelOption.SO_BACKLOG, properties.getBacklog())
.option(ChannelOption.SO_REUSEADDR, properties.getReuseAddr())
.childOption(ChannelOption.TCP_NODELAY, properties.getNoDelay())
.childOption(ChannelOption.SO_KEEPALIVE, properties.getKeepalive())
.childOption(ChannelOption.SO_SNDBUF, properties.getSendBufferSize())
.childOption(ChannelOption.SO_RCVBUF, properties.getReceiveBufferSize())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, properties.getConnectTimeout())
...
}
}
配置 pipeline 责任链
java
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
/**
* 将处理器注册到 EventLoop
*/
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 日志打印
pipeline.addLast(new LoggingHandler());
// 解码/编码
pipeline.addLast(new StringDecoder());
pipeline.addList(new StringEncoder());
// 接收到消息后的业务逻辑处理器
pipeline.addLast(new NettyServerHandler());
}
}
配置业务处理器
java
@ChannelHandler.Sharable
public class ServerHandler extends SimpleChannelInboundHandler<String> {
// 保留所有与服务器建立连接的 Channel 对象
public static ChannelGroup GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 当前 Channel 从客户端读取消息时调用。
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {}
/**
* 在 ChannelHandler 添加到实际上下文中并准备好处理事件后调用。
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {}
/**
* 在 ChannelHandler 从实际上下文中删除并且不再处理事件后调用。
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {}
/**
* 通道已向 EventLoop 注册时调用。
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {}
/**
* 通道已从 EventLoop 中注销时调用。
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {}
/**
* 通道现在处于活动状态时调用。
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {}
/**
* 通道现在处于非活动状态时调用。
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {}
/**
* 在读取的最后一条消息已被 channelRead() 消耗时调用。
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {}
/**
* 在触发用户事件时调用。
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {}
/**
* 在通道的可写状态更改后调用。
*/
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {}
/**
* 发生异常时调用。
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {}
}
其他处理器
空闲状态处理器
不共享
java
@Log4j2
public class ChannelIdleHandler extends IdleStateHandler {
public ChannelIdleHandler() {
super(60, 0, 0, TimeUnit.SECONDS);
}
/**
* 在触发用户事件 IdleStateEvent 调用。
*/
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent event) {
// 获取管道
Channel channel = ctx.channel();
ChannelId channelId = channel.id();
SocketAddress address = channel.remoteAddress();
// 获取空闲类型
IdleState idleState = event.state();
log.warn("{} -> 空闲连接将被关闭: ChannelID [{}] , Address [{}]", idleState, channelId, address);
// 关闭管道
channel.close();
}
}
解码/编码处理器
可自定义对象进行序列化
java
@ChannelHandler.Sharable
public class PacketCodecHandler extends MessageToMessageCodec<ByteBuf, CharSequence> {
/**
* 消息编码
*/
@Override
protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
// StringEncoder 默认实现
if (msg.length() == 0) {
return;
}
out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), StandardCharsets.UTF_8));
}
/**
* 消息解码
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
// StringDecoder 默认实现
out.add(msg.toString(StandardCharsets.UTF_8));
}
}
粘包解码器
名称 | 描述 |
---|---|
固定长度解码器 | |
行解码器 | |
长度字段解码器 | |
分隔符解码器 |