Netty 实践记录

启动 example/EchoServer 项目报错解决

java: 程序包sun.misc不存在

修改 Project Structure 从 jdk11降低到 jdk1.8

java: 程序包io.netty.util.collection不存在

造成报错的原因是用于生成模板的 groovy 脚本没有成功执行

于是,手动执行 -DskipTests compile

应用 Netty 实现服务端和客户端

消息结构

Server.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
 * 服务器端
 * 4个编解码器 + 1个业务处理器 + 日志处理器
 */
public class Server {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.channel(NioServerSocketChannel.class);

        // 日志
        serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));

        NioEventLoopGroup boss = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
        NioEventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));

        serverBootstrap.group(boss,worker);


        // 参数设置
        serverBootstrap.childOption(NioChannelOption.TCP_NODELAY,true);
        serverBootstrap.childOption(NioChannelOption.SO_BACKLOG,1024);

        MetricHandler metricHandler = new MetricHandler();

        serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                ChannelPipeline pipeline = nioSocketChannel.pipeline();
                // 日志
                pipeline.addLast("logHandler",new LoggingHandler(LogLevel.DEBUG));
                // 统计连接数
                pipeline.addLast("metricHandler", metricHandler);

                // 请求
                pipeline.addLast("frameDecoder",new OrderFrameDecoder());
                pipeline.addLast("protocolDecoder",new OrderProtocolDecoder());
                // 处理
                pipeline.addLast("processHandler",new OrderServerProcessHandler());
                // 响应
                pipeline.addLast("protocolEncoder",new OrderProtocolEncoder());
                pipeline.addLast("frameEncoder",new OrderFrameEncoder());

            }
        });
        // 启动时阻塞
        ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
        channelFuture.channel().closeFuture().get();
    }
}

Client.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
 * 客户端
 * 通过响应分发来改进
 */
public class ClientV2 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);

        // 调参
        bootstrap.option(NioChannelOption.CONNECT_TIMEOUT_MILLIS,10*1000);

        bootstrap.group(new NioEventLoopGroup());

         RequestPendingCenter requestPendingCenter = new RequestPendingCenter();

        bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                ChannelPipeline pipeline = nioSocketChannel.pipeline();
                pipeline.addLast("frameDecoder",new OrderFrameDecoder());
                pipeline.addLast("frameEncoder",new OrderFrameEncoder());
                pipeline.addLast("protocolEncoder",new OrderProtocolEncoder());
                pipeline.addLast("protocolDecoder",new OrderProtocolDecoder());
                // 响应式
                pipeline.addLast("dispatcherHandler",new ResponseDispatcherHandler(requestPendingCenter));
                pipeline.addLast("operationToRequestMessageEncoder",new OperationToRequestMessageEncoder());
                pipeline.addLast("logHandler",new LoggingHandler(LogLevel.INFO));
            }
        });
        // 启动时阻塞
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8090).sync();
        
        long streamId = IdUtil.nextId();
        OrderOperation orderOperation = new OrderOperation(1001, "土豆");
        RequestMessage requestMessage = new RequestMessage(streamId,orderOperation);

         OperationResultFuture operationResultFuture = new OperationResultFuture();
         // 加到 map 中去
         requestPendingCenter.add(streamId,operationResultFuture);
         // 写入队列并发送
        channelFuture.channel().writeAndFlush(requestMessage);
        // 阻塞等待结果
        OperationResult operationResult = operationResultFuture.get();
        System.out.println("operationResult = " + operationResult);
        channelFuture.channel().closeFuture().get();
    }
}

调优参数

Linux 系统参数

ulimit -n [xxx]:防止出现"Too many open file"报错。(重启或退出用户后失效)

SocketChannel

  • SO_SNDBUF:TCP数据发送缓冲区大小
  • SO_RCVBUF:TCP数据接收缓冲区大小
  • SO_KEEPALIVE:TCP层keepalive,默认关闭
  • SO_REUSEADDR:地址重用,多IP绑定相同端口,让关闭连接释放的端口更早可使用,默认不开启
  • SO_LINGER:关闭Socket的延迟时间,默认不开启
  • IP_TOS:设置IP头部的 Type-of-Service 字段,设定延时或吞吐量的优先级
  • TCP_NODELAY:是否启用Nagle算法,默认不开启
    • serverBootstrap.childOption(NioChannelOption.TCP_NODELAY,true);

ServerSocketChannel

  • SO_RCVBUF
  • SO_REUSEADDR
  • SO_BACKLOG:最大等待连接数 channel().bind(address,config.getBacklog());,默认128
    • serverBootstrap.childOption(NioChannelOption.SO_BACKLOG,1024);

ChennelOption

  • WRITE_UFFER_WATER_MARK:高低水位线,间接防止OOM(默认32k~64k)

  • CONNECT_TIMEOUT_MILLIS:客户端连接服务器最大允许时间,默认30s

    • bootstrap.option(NioChannelOption.CONNECT_TIMEOUT_MILLIS,10*1000);
  • MAX_MESSAGES_PER_READ:最大允许连续读次数

  • WRITE_SPIN_COUNT:最大允许连续写次数

  • ALLOCATOR:ByteBuf 分配器,默认池化、堆外

  • RCVBUF_ALLOCATOR:数据接收 ByteBuf 分配大小计算器+读次数控制器

  • AUTO_READ:是否监听读事件,默认 true

  • AUTO_CLOSE:是否在写失败时关闭连接,默认 true

  • MESSAGE_SIZE_ESTIMATOR:数据大小计算器 byteBuf.readableBytes()

  • SINGLE_EVENTEXECUTOR_PER_GROUP:当增加 handler 且指定 EventExecutorGroup 时,决定这个 handler 是否只用其中的一个固定的 EventExecutor,默认 true

  • ALLOW_HALF_CLOSURE:是否允许半关闭连接,默认 false

    • 实现关闭通道后还能收到未发送完的数据

易诊断性

完善对象名称

线程名称

NioEventLoopGroup boss = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));

handler 名称

pipeline.addLast("frameDecoder",new OrderFrameDecoder());

集成日志

netty 使用日志的优先级 slf4j > log4j > log4j2 > jdkLogger

原理
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// InternalLoggerFactory.java
// slf4j > log4j > log4j2 > jdkLogger
    private static InternalLoggerFactory newDefaultFactory(String name) {
        InternalLoggerFactory f;
        try {
            f = new Slf4JLoggerFactory(true);
            f.newInstance(name).debug("Using SLF4J as the default logging framework");
        } catch (Throwable ignore1) {
            try {
                f = Log4JLoggerFactory.INSTANCE;
                f.newInstance(name).debug("Using Log4J as the default logging framework");
            } catch (Throwable ignore2) {
                try {
                    f = Log4J2LoggerFactory.INSTANCE;
                    f.newInstance(name).debug("Using Log4J2 as the default logging framework");
                } catch (Throwable ignore3) {
                    f = JdkLoggerFactory.INSTANCE;
                    f.newInstance(name).debug("Using java.util.logging as the default logging framework");
                }
            }
        }
        return f;
    }
                    
使用

pom.xml

1
2
3
4
5
<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-log4j12</artifactId>
  <version>1.7.25</version>
</dependency>

server.java

1
2
3
4
5
6
7
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                ChannelPipeline pipeline = nioSocketChannel.pipeline();
                pipeline.addLast("logHandler",new LoggingHandler(LogLevel.DEBUG));
            }
 });

resources/log4j.properties

1
2
3
4
5
6
7
### 设置###
log4j.rootLogger = debug,stdout
### 输出信息到控制台 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

统计并展示当前系统连接数

pom.xml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
        <!-- metrics 度量当前系统连接数-->
        <dependency>
            <groupId>io.dropwizard.metrics</groupId>
            <artifactId>metrics-core</artifactId>
            <version>4.2.19</version>
        </dependency>
        <dependency>
            <groupId>io.dropwizard.metrics</groupId>
            <artifactId>metrics-jmx</artifactId>
            <version>4.2.19</version>
        </dependency>

MetricHandler.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
 * 统计并展示当前系统连接数
 */
@ChannelHandler.Sharable
public class MetricHandler extends ChannelDuplexHandler {
    private AtomicLong totalConnectionNumber = new AtomicLong();

    // 将信息以 Gauge 的形式 注册给 MetricRegistry
    {
        MetricRegistry metricRegistry = new MetricRegistry();
        metricRegistry.register("totalConnectionNumber", new Gauge<Long>() {
            @Override
            public Long getValue() {
                return totalConnectionNumber.longValue();
            }
        });

        // console 每 5s 打印
        ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metricRegistry).build();
        consoleReporter.start(5, TimeUnit.SECONDS);

        // jconsole 实时展示
        JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
        jmxReporter.start();

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        totalConnectionNumber.incrementAndGet();
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        totalConnectionNumber.decrementAndGet();
        super.channelInactive(ctx);
    }
}

server.java

1
2
3
4
5
6
7
8
9
MetricHandler metricHandler = new MetricHandler();
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
  @Override
  protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
    ChannelPipeline pipeline = nioSocketChannel.pipeline();
    // 统计连接数
    pipeline.addLast("metricHandler", metricHandler);
  }
});

内存泄漏检测

ResourceLeakDetector

级别
1
2
3
4
5
6
public enum Level {
  DISABLED, // 关闭
  SIMPLE, // 抽样,是否泄露 (默认)
  ADVANCED, // 抽样,具体泄露地方
  PARANOID; // 全样本
}

修改 VM options:-Dio.netty.leakDetection.level=PARANOID

弱引用

将 DefaultResourceLeak 放入 ReferenceQueue

判断时机:弱引用指向对象被 GC 回收时,遍历弱引用队列 ReferenceQueue

记录访问信息

Record extends Thowable

触发汇报时机

AbstractByteBufAllocator#buffer()

Netty 注解

  • Sharable:标识 handler 提醒可共享,否则不能重复加入 pipeline。如果没有需要竞争的资源,就可以共享。
  • Skip:跳过 handler 的执行
  • UnstableApi:标识不稳定的 Api

pipeline 优化

线程模型优化

  • CPU 密集型

    • Runtime.getRuntime().availableProcessors() * 2
    • io.netty.availableProcessors * 2【考虑 docker】
    • io.netty.eventLoopThreads
  • IO 密集型:独立出线程池来处理业务

    • 在 handler 内部使用 JDK Executors

    • 添加 handler 时指定 EventExecutorGroup,和 handler 参数一起放入 pipeline 中

      • 1
        2
        3
        4
        5
        6
        7
        8
        9
        
        UnorderedThreadPoolEventExecutor businessExecutor = new UnorderedThreadPoolEventExecutor(10, new DefaultThreadFactory("business"));
        serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
          @Override
          protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
            ChannelPipeline pipeline = nioSocketChannel.pipeline();
            // 处理:IO 密集型,独立线程池
            pipeline.addLast(businessExecutor,"business",new OrderServerProcessHandler());
          }
        });
      • 为什么用 UnorderedThreadPoolEventExecutor 而不是 NioEventLoopGroup?

      • 因为 NioEventLoopGroup 在 ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP 参数的作用下, next() 方法返回 this,默认只启用一个线程。

增强写

pipeline.addLast("flushEnhance",new FlushConsolidationHandler(5,true));

流量整形

pipeline.addLast("TSHandler", globalTrafficShapingHandler);

Linux 下开启 native

  1. 准备好 native 库
  2. 加载顺序:java.library.path 优先于 META-INF/native/

防止 OOM

1
2
3
4
5
if (channelHandlerContext.channel().isActive() && channelHandlerContext.channel().isWritable()){
  channelHandlerContext.writeAndFlush(responseMessage);
}else {
  log.error("message dropped");
}

空闲监测:keepalive 和 idle

位于 TCP 层,不同于 HTTP的 keep-alive(默认于 HTTP/1.1 支持),idle 配合监测:

  1. 减少 keepalive 消息
  2. 直接关闭连接,防止拒绝服务攻击
1
2
3
4
5
6
7
// server 端开启 TCP keepalive
// 在很多的 if-else 语句之中
bootstrap.childOption(ChannelOption.SO_KEEPALIVE,true);
// 以 key-value 方式加入参数(推荐)
bootstrap.childOption(NioChannelOption.of(StandardSocketOptions.SO_KEEPALIVE),true);
// 开启 idel check,对应于 读、写、all 三个时间
pipeline().addLast("idleCheckHandler",new IdleStateHandler(0,20,0,TimeUnit.SECONDS));
服务端

ServerIdleCheckHandler.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
@Slf4j
public class ServerIdleCheckHandler extends IdleStateHandler {
    public ServerIdleCheckHandler() {
        super(10, 0, 0, TimeUnit.SECONDS);
    }

    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        if (evt == IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT){
            log.info("idle check happen, so close the connection");
            ctx.close();
            return;
        }
        super.channelIdle(ctx, evt);
    }
}

Server.java

1
2
// Idle监测
pipeline.addLast("idleCheck",new ServerIdleCheckHandler());
客户端

ClientIdleCheckHandler.java

1
2
3
4
5
public class ClientIdleCheckHandler extends IdleStateHandler {
    public ClientIdleCheckHandler() {
        super(0,5,0);
    }
}

KeepaliveHandler.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@Slf4j
@ChannelHandler.Sharable
public class KeepaliveHandler extends ChannelDuplexHandler {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt== IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT){
            log.info("write idle happen, so need to send keepalive to keep connection not closed");
            KeepaliveOperation keepaliveOperation = new KeepaliveOperation();
            RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(),keepaliveOperation);
            ctx.writeAndFlush(requestMessage);
        }
        super.userEventTriggered(ctx, evt);
    }
}

Client.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// Sharable
KeepaliveHandler keepaliveHandler = new KeepaliveHandler();

bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
  @Override
  protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
    ChannelPipeline pipeline = nioSocketChannel.pipeline();
    pipeline.addLast("keepalive",keepaliveHandler);
  }
});

黑白名单

Server.java

1
2
3
4
5
6
// 黑白名单 Sharable
final RuleBasedIpFilter ruleBasedIpFilter = new RuleBasedIpFilter(
  // 基于 CIDR 前缀,以下表示禁止以 127.1 开头的 ip 地址
  new IpSubnetFilterRule("127.1.0.1", 16, IpFilterRuleType.REJECT));
...
  pipeline.addLast("ipFilter",ruleBasedIpFilter);

认证

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
 * 自定义认证
 * 若认证成功,移除认证 handler;若认证失败,关闭连接,也不需要该 handler 了
 */
@Slf4j
@ChannelHandler.Sharable
public class AuthHandler extends SimpleChannelInboundHandler<RequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RequestMessage msg) throws Exception {
        try {
            Operation operation = msg.getMessageBody();
            if (operation instanceof AuthOperation) {
                // 认证操作
                AuthOperation authOperation = AuthOperation.class.cast(operation);
                AuthOperationResult authOperationResult = authOperation.execute();
                if (authOperationResult.isPassAuth()) {
                    log.info("pass auth");
                } else {
                    log.error("fail to auth");
                    ctx.close();
                }
            } else {
                log.error("expect first msg is auth");
                ctx.close();
            }
        } catch (Exception e) {
            log.error("exception happen: " + e.getMessage());
            ctx.close();
        } finally {
            ctx.pipeline().remove(this);
        }
    }
}

加入 pipeline 同理,Sharable 的 handler 对象提前定义,但是在顺序上需要注意,必须放在 protocolDecoder 之后,因为它需要的参数不是 ByteBuf,而是 RequestMessage 对象,如下:

1
2
3
4
5
// 请求
pipeline.addLast("frameDecoder",new OrderFrameDecoder());
pipeline.addLast("protocolDecoder",new OrderProtocolDecoder());
// 认证, 必须放在 protocolDecoder 之后
pipeline.addLast("ipFilter",authHandler);

SSL/TLS 协议

在传输层上封装了应用层协议,在不需要修改应用层协议的前提下提供安全保障。

非对称加密:公钥放在证书,私钥通过三个随机数生成。

抓包工具:WireShark

Server.java

1
2
3
4
5
6
// SSL 自签证书
SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
// 证书路径,用于客户端导入
log.info(selfSignedCertificate.certificate().toString());
SslContext sslContext = SslContextBuilder.forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey()).build();
SslHandler sslHandler = sslContext.newHandler(nioSocketChannel.alloc());

Client.java

1
SslContext sslContext = SslContextBuilder.forClient().build();

Netty 的应用

Dubbo

  • 利用 Idle 机制,在读写都空闲时,触发心跳。
  • 使用 socks5 代替 http 协议,能代理 TCP、UDP。

Hadoop

  • HTTP 服务构建
  • 用 filter 来阻止 CSRF 跨站请求伪造
  • 分块写(chucked write)

开源贡献注意事项

  • 代码风格和项目保持一致
  • Deprecated 要写注释,建议用哪个方法或类来替代
  • 功能性提升,要做单元测试;性能优化,要做性能测试(benchmark)
  • 400 行以内为一个 PR,作为一次 Code Review

参考资料

  • 极客时间《Netty 源码剖析与实战》
0%