启动 example/EchoServer 项目报错解决
java: 程序包sun.misc不存在
修改 Project Structure 从 jdk11降低到 jdk1.8
java: 程序包io.netty.util.collection不存在
造成报错的原因是用于生成模板的 groovy 脚本没有成功执行
于是,手动执行 -DskipTests compile
应用 Netty 实现服务端和客户端
消息结构
Server.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
| /**
* 服务器端
* 4个编解码器 + 1个业务处理器 + 日志处理器
*/
public class Server {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
// 日志
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
NioEventLoopGroup boss = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
NioEventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
serverBootstrap.group(boss,worker);
// 参数设置
serverBootstrap.childOption(NioChannelOption.TCP_NODELAY,true);
serverBootstrap.childOption(NioChannelOption.SO_BACKLOG,1024);
MetricHandler metricHandler = new MetricHandler();
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
ChannelPipeline pipeline = nioSocketChannel.pipeline();
// 日志
pipeline.addLast("logHandler",new LoggingHandler(LogLevel.DEBUG));
// 统计连接数
pipeline.addLast("metricHandler", metricHandler);
// 请求
pipeline.addLast("frameDecoder",new OrderFrameDecoder());
pipeline.addLast("protocolDecoder",new OrderProtocolDecoder());
// 处理
pipeline.addLast("processHandler",new OrderServerProcessHandler());
// 响应
pipeline.addLast("protocolEncoder",new OrderProtocolEncoder());
pipeline.addLast("frameEncoder",new OrderFrameEncoder());
}
});
// 启动时阻塞
ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
channelFuture.channel().closeFuture().get();
}
}
|
Client.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
| /**
* 客户端
* 通过响应分发来改进
*/
public class ClientV2 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
// 调参
bootstrap.option(NioChannelOption.CONNECT_TIMEOUT_MILLIS,10*1000);
bootstrap.group(new NioEventLoopGroup());
RequestPendingCenter requestPendingCenter = new RequestPendingCenter();
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
ChannelPipeline pipeline = nioSocketChannel.pipeline();
pipeline.addLast("frameDecoder",new OrderFrameDecoder());
pipeline.addLast("frameEncoder",new OrderFrameEncoder());
pipeline.addLast("protocolEncoder",new OrderProtocolEncoder());
pipeline.addLast("protocolDecoder",new OrderProtocolDecoder());
// 响应式
pipeline.addLast("dispatcherHandler",new ResponseDispatcherHandler(requestPendingCenter));
pipeline.addLast("operationToRequestMessageEncoder",new OperationToRequestMessageEncoder());
pipeline.addLast("logHandler",new LoggingHandler(LogLevel.INFO));
}
});
// 启动时阻塞
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8090).sync();
long streamId = IdUtil.nextId();
OrderOperation orderOperation = new OrderOperation(1001, "土豆");
RequestMessage requestMessage = new RequestMessage(streamId,orderOperation);
OperationResultFuture operationResultFuture = new OperationResultFuture();
// 加到 map 中去
requestPendingCenter.add(streamId,operationResultFuture);
// 写入队列并发送
channelFuture.channel().writeAndFlush(requestMessage);
// 阻塞等待结果
OperationResult operationResult = operationResultFuture.get();
System.out.println("operationResult = " + operationResult);
channelFuture.channel().closeFuture().get();
}
}
|
调优参数
Linux 系统参数
ulimit -n [xxx]
:防止出现"Too many open file"报错。(重启或退出用户后失效)
SocketChannel
- SO_SNDBUF:TCP数据发送缓冲区大小
- SO_RCVBUF:TCP数据接收缓冲区大小
- SO_KEEPALIVE:TCP层keepalive,默认关闭
- SO_REUSEADDR:地址重用,多IP绑定相同端口,让关闭连接释放的端口更早可使用,默认不开启
- SO_LINGER:关闭Socket的延迟时间,默认不开启
- IP_TOS:设置IP头部的 Type-of-Service 字段,设定延时或吞吐量的优先级
- TCP_NODELAY:是否启用Nagle算法,默认不开启
serverBootstrap.childOption(NioChannelOption.TCP_NODELAY,true);
ServerSocketChannel
- SO_RCVBUF
- SO_REUSEADDR
- SO_BACKLOG:最大等待连接数
channel().bind(address,config.getBacklog());
,默认128serverBootstrap.childOption(NioChannelOption.SO_BACKLOG,1024);
ChennelOption
WRITE_UFFER_WATER_MARK:高低水位线,间接防止OOM(默认32k~64k)
CONNECT_TIMEOUT_MILLIS:客户端连接服务器最大允许时间,默认30s
bootstrap.option(NioChannelOption.CONNECT_TIMEOUT_MILLIS,10*1000);
MAX_MESSAGES_PER_READ:最大允许连续读次数
WRITE_SPIN_COUNT:最大允许连续写次数
ALLOCATOR:ByteBuf 分配器,默认池化、堆外
RCVBUF_ALLOCATOR:数据接收 ByteBuf 分配大小计算器+读次数控制器
AUTO_READ:是否监听读事件,默认 true
AUTO_CLOSE:是否在写失败时关闭连接,默认 true
MESSAGE_SIZE_ESTIMATOR:数据大小计算器 byteBuf.readableBytes()
SINGLE_EVENTEXECUTOR_PER_GROUP:当增加 handler 且指定 EventExecutorGroup 时,决定这个 handler 是否只用其中的一个固定的 EventExecutor,默认 true
ALLOW_HALF_CLOSURE:是否允许半关闭连接,默认 false
实现关闭通道后还能收到未发送完的数据
易诊断性
完善对象名称
线程名称
NioEventLoopGroup boss = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
handler 名称
pipeline.addLast("frameDecoder",new OrderFrameDecoder());
集成日志
netty 使用日志的优先级 slf4j > log4j > log4j2 > jdkLogger
原理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| // InternalLoggerFactory.java
// slf4j > log4j > log4j2 > jdkLogger
private static InternalLoggerFactory newDefaultFactory(String name) {
InternalLoggerFactory f;
try {
f = new Slf4JLoggerFactory(true);
f.newInstance(name).debug("Using SLF4J as the default logging framework");
} catch (Throwable ignore1) {
try {
f = Log4JLoggerFactory.INSTANCE;
f.newInstance(name).debug("Using Log4J as the default logging framework");
} catch (Throwable ignore2) {
try {
f = Log4J2LoggerFactory.INSTANCE;
f.newInstance(name).debug("Using Log4J2 as the default logging framework");
} catch (Throwable ignore3) {
f = JdkLoggerFactory.INSTANCE;
f.newInstance(name).debug("Using java.util.logging as the default logging framework");
}
}
}
return f;
}
|
使用
pom.xml
1
2
3
4
5
| <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
|
server.java
1
2
3
4
5
6
7
| serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
ChannelPipeline pipeline = nioSocketChannel.pipeline();
pipeline.addLast("logHandler",new LoggingHandler(LogLevel.DEBUG));
}
});
|
resources/log4j.properties
1
2
3
4
5
6
7
| ### 设置###
log4j.rootLogger = debug,stdout
### 输出信息到控制台 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
|
统计并展示当前系统连接数
pom.xml
1
2
3
4
5
6
7
8
9
10
11
| <!-- metrics 度量当前系统连接数-->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>4.2.19</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jmx</artifactId>
<version>4.2.19</version>
</dependency>
|
MetricHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| /**
* 统计并展示当前系统连接数
*/
@ChannelHandler.Sharable
public class MetricHandler extends ChannelDuplexHandler {
private AtomicLong totalConnectionNumber = new AtomicLong();
// 将信息以 Gauge 的形式 注册给 MetricRegistry
{
MetricRegistry metricRegistry = new MetricRegistry();
metricRegistry.register("totalConnectionNumber", new Gauge<Long>() {
@Override
public Long getValue() {
return totalConnectionNumber.longValue();
}
});
// console 每 5s 打印
ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metricRegistry).build();
consoleReporter.start(5, TimeUnit.SECONDS);
// jconsole 实时展示
JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
jmxReporter.start();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
totalConnectionNumber.incrementAndGet();
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
totalConnectionNumber.decrementAndGet();
super.channelInactive(ctx);
}
}
|
server.java
1
2
3
4
5
6
7
8
9
| MetricHandler metricHandler = new MetricHandler();
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
ChannelPipeline pipeline = nioSocketChannel.pipeline();
// 统计连接数
pipeline.addLast("metricHandler", metricHandler);
}
});
|
内存泄漏检测
ResourceLeakDetector
级别
1
2
3
4
5
6
| public enum Level {
DISABLED, // 关闭
SIMPLE, // 抽样,是否泄露 (默认)
ADVANCED, // 抽样,具体泄露地方
PARANOID; // 全样本
}
|
修改 VM options:-Dio.netty.leakDetection.level=PARANOID
弱引用
将 DefaultResourceLeak 放入 ReferenceQueue
判断时机:弱引用指向对象被 GC 回收时,遍历弱引用队列 ReferenceQueue
记录访问信息
Record extends Thowable
触发汇报时机
AbstractByteBufAllocator#buffer()
Netty 注解
- Sharable:标识 handler 提醒可共享,否则不能重复加入 pipeline。如果没有需要竞争的资源,就可以共享。
- Skip:跳过 handler 的执行
- UnstableApi:标识不稳定的 Api
pipeline 优化
线程模型优化
CPU 密集型
- Runtime.getRuntime().availableProcessors() * 2
- io.netty.availableProcessors * 2【考虑 docker】
- io.netty.eventLoopThreads
IO 密集型:独立出线程池来处理业务
增强写
pipeline.addLast("flushEnhance",new FlushConsolidationHandler(5,true));
流量整形
pipeline.addLast("TSHandler", globalTrafficShapingHandler);
Linux 下开启 native
- 准备好 native 库
- 加载顺序:java.library.path 优先于 META-INF/native/
防止 OOM
1
2
3
4
5
| if (channelHandlerContext.channel().isActive() && channelHandlerContext.channel().isWritable()){
channelHandlerContext.writeAndFlush(responseMessage);
}else {
log.error("message dropped");
}
|
空闲监测:keepalive 和 idle
位于 TCP 层,不同于 HTTP的 keep-alive(默认于 HTTP/1.1 支持),idle 配合监测:
- 减少 keepalive 消息
- 直接关闭连接,防止拒绝服务攻击
1
2
3
4
5
6
7
| // server 端开启 TCP keepalive
// 在很多的 if-else 语句之中
bootstrap.childOption(ChannelOption.SO_KEEPALIVE,true);
// 以 key-value 方式加入参数(推荐)
bootstrap.childOption(NioChannelOption.of(StandardSocketOptions.SO_KEEPALIVE),true);
// 开启 idel check,对应于 读、写、all 三个时间
pipeline().addLast("idleCheckHandler",new IdleStateHandler(0,20,0,TimeUnit.SECONDS));
|
服务端
ServerIdleCheckHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| @Slf4j
public class ServerIdleCheckHandler extends IdleStateHandler {
public ServerIdleCheckHandler() {
super(10, 0, 0, TimeUnit.SECONDS);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
if (evt == IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT){
log.info("idle check happen, so close the connection");
ctx.close();
return;
}
super.channelIdle(ctx, evt);
}
}
|
Server.java
1
2
| // Idle监测
pipeline.addLast("idleCheck",new ServerIdleCheckHandler());
|
客户端
ClientIdleCheckHandler.java
1
2
3
4
5
| public class ClientIdleCheckHandler extends IdleStateHandler {
public ClientIdleCheckHandler() {
super(0,5,0);
}
}
|
KeepaliveHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| @Slf4j
@ChannelHandler.Sharable
public class KeepaliveHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt== IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT){
log.info("write idle happen, so need to send keepalive to keep connection not closed");
KeepaliveOperation keepaliveOperation = new KeepaliveOperation();
RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(),keepaliveOperation);
ctx.writeAndFlush(requestMessage);
}
super.userEventTriggered(ctx, evt);
}
}
|
Client.java
1
2
3
4
5
6
7
8
9
10
| // Sharable
KeepaliveHandler keepaliveHandler = new KeepaliveHandler();
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
ChannelPipeline pipeline = nioSocketChannel.pipeline();
pipeline.addLast("keepalive",keepaliveHandler);
}
});
|
黑白名单
Server.java
1
2
3
4
5
6
| // 黑白名单 Sharable
final RuleBasedIpFilter ruleBasedIpFilter = new RuleBasedIpFilter(
// 基于 CIDR 前缀,以下表示禁止以 127.1 开头的 ip 地址
new IpSubnetFilterRule("127.1.0.1", 16, IpFilterRuleType.REJECT));
...
pipeline.addLast("ipFilter",ruleBasedIpFilter);
|
认证
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| /**
* 自定义认证
* 若认证成功,移除认证 handler;若认证失败,关闭连接,也不需要该 handler 了
*/
@Slf4j
@ChannelHandler.Sharable
public class AuthHandler extends SimpleChannelInboundHandler<RequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestMessage msg) throws Exception {
try {
Operation operation = msg.getMessageBody();
if (operation instanceof AuthOperation) {
// 认证操作
AuthOperation authOperation = AuthOperation.class.cast(operation);
AuthOperationResult authOperationResult = authOperation.execute();
if (authOperationResult.isPassAuth()) {
log.info("pass auth");
} else {
log.error("fail to auth");
ctx.close();
}
} else {
log.error("expect first msg is auth");
ctx.close();
}
} catch (Exception e) {
log.error("exception happen: " + e.getMessage());
ctx.close();
} finally {
ctx.pipeline().remove(this);
}
}
}
|
加入 pipeline 同理,Sharable 的 handler 对象提前定义,但是在顺序上需要注意,必须放在 protocolDecoder 之后,因为它需要的参数不是 ByteBuf,而是 RequestMessage 对象,如下:
1
2
3
4
5
| // 请求
pipeline.addLast("frameDecoder",new OrderFrameDecoder());
pipeline.addLast("protocolDecoder",new OrderProtocolDecoder());
// 认证, 必须放在 protocolDecoder 之后
pipeline.addLast("ipFilter",authHandler);
|
SSL/TLS 协议
在传输层上封装了应用层协议,在不需要修改应用层协议的前提下提供安全保障。
非对称加密:公钥放在证书,私钥通过三个随机数生成。
抓包工具:WireShark
Server.java
1
2
3
4
5
6
| // SSL 自签证书
SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
// 证书路径,用于客户端导入
log.info(selfSignedCertificate.certificate().toString());
SslContext sslContext = SslContextBuilder.forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey()).build();
SslHandler sslHandler = sslContext.newHandler(nioSocketChannel.alloc());
|
Client.java
1
| SslContext sslContext = SslContextBuilder.forClient().build();
|
Netty 的应用
Dubbo
- 利用 Idle 机制,在读写都空闲时,触发心跳。
- 使用 socks5 代替 http 协议,能代理 TCP、UDP。
Hadoop
- HTTP 服务构建
- 用 filter 来阻止 CSRF 跨站请求伪造
- 分块写(chucked write)
开源贡献注意事项
- 代码风格和项目保持一致
Deprecated
要写注释,建议用哪个方法或类来替代- 功能性提升,要做单元测试;性能优化,要做性能测试(benchmark)
- 400 行以内为一个 PR,作为一次 Code Review
参考资料