2015-10-27 09:18:55.0|分类: netty|浏览量: 7348
摘要 1、Netty中boss线程池大小为1,worker线程池大小为8, new NioEventLoopGroup(8),其余线程分配给业务使用。 2、由于超时时间过长,100W个长链接链路会创建100w长连接对象(比如channel/IdlChannelHandlerAdapter/ScheduledFutureTask ...),每个对象还保存有业务的成员变量,非常消耗内存。一些定时任务被老化到持久代中,没有被JVM垃圾回收掉,内存一直在增长,用户误认为存在内存泄露。 所以说一些长时间没有通信的链接需要关闭掉。 定时心跳显得很重要,如果300s没有收到任何信息,则把链接关闭掉。 3、不要在Netty的I/O线程上处理业务(心跳发送和检测除外)。为什么呢?? 因为Java进程,线程不能无限增长。这就意味着Netty的Reactor线程数必须收敛。但是在实际业务处理中,经常会有一些额外的复杂逻辑处理,例如性能统计、记录接口日志等,这些业务操作性能开销也比较大,如果在I/O线程上直接做业务逻辑处理,可能会阻塞I/O线程,影响对其它链路的读写操作,或者导致一些链路的关闭或者打开比较慢。 4、写日志的时候小心 在生产环境中,需要实时打印接口日志,其它日志处于ERROR级别,当推送服务发生I/O异常之后,会记录异常日志。如果当前磁盘的WIO比较高,可能会发生写日志文件操作被同步阻塞,阻塞时间无法预测。这就会导致Netty的NioEventLoop线程被阻塞,Socket链路无法被及时关闭、其它的链路也无法进行读写操作等 5、-Xmx:JVM最大内存需要根据内存模型进行计算并得出相对合理的值; 关键词 推送信息状况 信息阅读的方式比较多,例如:pc、手机、ipad、穿戴设备等等 现在使用网络质量并不是稳定的,网络主要是运营商的无线移动网,例如在地铁上、山区、沙漠信号就很差,容易发生网络闪断; 海量的终端链接,而且通常使用长连接,服务器维持大量长连接,资源消耗都非常大; 谷歌开发的推送框架无法再国内使用,android的应用必须自己 由于谷歌的推送框架无法在国内使用,Android的长连接是由每个应用各自维护的,这就意味着每台安卓设备上会存在多个长连接。即便没有消息需要推送,长连接本身的心跳消息量也是非常巨大的,这就会导致流量和耗电量的增加; 不稳定:消息丢失、重复推送、延迟送达、过期推送时有发生; 垃圾消息到处是。 netty框架
netty介绍 Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。 netty优点
统一的API,适用于不同的协议(阻塞和非阻塞),比如http/tcp/udp 基于灵活、可扩展的事件驱动模型,采用了链式的事件模型 高度可定制的线程模型 可靠的无连接数据Socket支持(UDP) 更好的吞吐量,低延迟 netty架构 平台关键设计 链接合法性验证 mina客户端和服务器端建立了一个连接,并且为这个链接打开一个长连接。建立一个channel,客户端登陆操作,服务器将连接和账号进行关联。当有一个请求到来的时候,首先检查连接是否合法,这个链接是否登陆,用户是否正确。 解码和编码 编码它将对象序列化为字节数组,用于网络传输、数据持久化等用途。反之,解码(Decode)/反序列化(deserialization)把从网络、磁盘等读取的字节数组还原成原始对象。 Netty的逻辑架构图: 从网络读取的inbound消息,需要经过解码,将二进制的数据报转换成应用层协议消息或者业务消息,才能够被上层的应用逻辑识别和处理;同理,用户发送到网络的outbound业务消息,需要经过编码转换成二进制字节数组(对于Netty就是ByteBuf)才能够发送到网络对端。编码和解码功能是NIO框架的有机组成部分,无论是由业务定制扩展实现,还是NIO框架内置编解码能力,该功能是必不可少的。 netty常用的解码器: LineBasedFrameDecoder解码器 LineBasedFrameDecoder是回车换行解码器,如果用户发送的消息以回车换行符作为消息结束的标识。 LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf中的可读字节,判断看是否有“\n”或者“\r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。 DelimiterBasedFrameDecoder解码器 DelimiterBasedFrameDecoder是分隔符解码器,用户可以指定消息结束的分隔符,它可以自动完成以分隔符作为码流结束标识的消息的解码。回车换行解码器实际上是一种特殊的DelimiterBasedFrameDecoder解码器。 FixedLengthFrameDecoder解码器 FixedLengthFrameDecoder是固定长度解码器,它能够按照指定的长度对消息进行自动解码,开发者不需要考虑TCP的粘包/拆包等问题,非常实用。 对于定长消息,如果消息实际长度小于定长,则往往会进行补位操作,它在一定程度上导致了空间和资源的浪费。但是它的优点也是非常明显的,编解码比较简单,因此在实际项目中仍然有一定的应用场景。 LengthFieldBasedFrameDecoder解码器 ObjectEncoder编码器 心跳和超时检测 客户端和服务器建立长连接,服务器端会保存着这个长连接,然后对长连接进行轮询看看是否有新的消息。 当客户端socket在非正常情况家掉线,如: 断网,断电等特殊问题的时候, 客户端的channel对象不会自动关闭,还以为这个链接活跃,继续保持着。当服务器端向客户端推送消息的时候,信息已经发送出去了,但是客户端没有收到。 务端增加对空闲时间处理pipeline.addLast("ping", new IdleStateHandler(60, 15, 13,TimeUnit.SECONDS)) 然后在业务逻辑的Handler里面,重写 userEventTriggered(ChannelHandlerContext ctx, Object evt),如果获取到IdleState.ALL_IDLE则定时向客户端发送心跳包;客户端在业务逻辑的Handler里面,如果接到心跳包,则向服务器发送一个心跳反馈;服务端如果长时间没有接受到客户端的信息,即IdleState.READER_IDLE被触发,则关闭当前的channel。 Netty提供的空闲检测机制分为三种: 1) 读空闲,链路持续时间t没有读取到任何消息; 2) 写空闲,链路持续时间t没有发送任何消息; 3) 读写空闲,链路持续时间t没有接收或者发送任何消息。 Netty的默认读写空闲机制是发生超时异常,关闭连接,但是,我们可以定制它的超时实现机制,以便支持不同的用户场景。 服务器开启流程 步骤1:创建ServerBootstrap实例。ServerBootstrap是Netty服务端的启动辅助类,它提供了一系列的方法用于设置服务端启动相关的参数。底层通过门面模式对各种能力进行抽象和封装,尽量不需要用户跟过多的底层API打交道,降低用户的开发难度。 我们在创建ServerBootstrap实例时,会惊讶的发现ServerBootstrap只有一个无参的构造函数,作为启动辅助类这让人不可思议,因为它需要与多个其它组件或者类交互。ServerBootstrap构造函数没有参数的根本原因是因为它的参数太多了,而且未来也可能会发生变化,为了解决这个问题,就需要引入Builder模式。《Effective Java》第二版第2条建议遇到多个构造器参数时要考虑用构建器,关于多个参数构造函数的缺点和使用构建器的优点大家可以查阅《Effective Java》,在此不再详述。 步骤2:设置并绑定Reactor线程池。Netty的Reactor线程池是EventLoopGroup,它实际就是EventLoop的数组。EventLoop的职责是处理所有注册到本线程多路复用器Selector上的Channel,Selector的轮询操作由绑定的EventLoop线程run方法驱动,在一个循环体内循环执行。值得说明的是,EventLoop的职责不仅仅是处理网络I/O事件,用户自定义的Task和定时任务Task也统一由EventLoop负责处理,这样线程模型就实现了统一。从调度层面看,也不存在在EventLoop线程中再启动其它类型的线程用于异步执行其它的任务,这样就避免了多线程并发操作和锁竞争,提升了I/O线程的处理和调度性能。 步骤3:设置并绑定服务端Channel。作为NIO服务端,需要创建ServerSocketChannel,Netty对原生的NIO类库进行了封装,对应实现是NioServerSocketChannel。对于用户而言,不需要关心服务端Channel的底层实现细节和工作原理,只需要指定具体使用哪种服务端Channel即可。因此,Netty的ServerBootstrap方法提供了channel方法用于指定服务端Channel的类型。Netty通过工厂类,利用反射创建NioServerSocketChannel对象。由于服务端监听端口往往只需要在系统启动时才会调用,因此反射对性能的影响并不大。相关代码如下所示: 步骤4:链路建立的时候创建并初始化ChannelPipeline。ChannelPipeline并不是NIO服务端必需的,它本质就是一个负责处理网络事件的职责链,负责管理和执行ChannelHandler。网络事件以事件流的形式在ChannelPipeline中流转,由ChannelPipeline根据ChannelHandler的执行策略调度ChannelHandler的执行。典型的网络事件如下: 链路注册; 链路激活; 链路断开; 接收到请求消息; 请求消息接收并处理完毕; 发送应答消息; 链路发生异常; 发生用户自定义事件。 步骤5:初始化ChannelPipeline完成之后,添加并设置ChannelHandler。ChannelHandler是Netty提供给用户定制和扩展的关键接口。利用ChannelHandler用户可以完成大多数的功能定制,例如消息编解码、心跳、安全认证、TSL/SSL认证、流量控制和流量整形等。Netty同时也提供了大量的系统ChannelHandler供用户使用,比较实用的系统ChannelHandler总结如下: 系统编解码框架-ByteToMessageCodec; 通用基于长度的半包解码器-LengthFieldBasedFrameDecoder; 码流日志打印Handler-LoggingHandler; SSL安全认证Handler-SslHandler; 链路空闲检测Handler-IdleStateHandler; 流量整形Handler-ChannelTrafficShapingHandler; Base64编解码-Base64Decoder和Base64Encoder。 步骤6:绑定并启动监听端口。在绑定监听端口之前系统会做一系列的初始化和检测工作,完成之后,会启动监听端口,并将ServerSocketChannel注册到Selector上监听客户端连接,相关代码如下: private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); } 步骤7:Selector轮询。由Reactor线程NioEventLoop负责调度和执行Selector轮询操作,选择准备就绪的Channel集合,相关代码如下: private void select() throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding selector.", selectCnt); rebuildSelector(); selector = this.selector; // Select again to populate selectedKeys. selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = System.nanoTime(); } if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1); } } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e); } // Harmless exception - log anyway } } 步骤8:当轮询到准备就绪的Channel之后,就由Reactor线程NioEventLoop执行ChannelPipeline的相应方法,最终调度并执行ChannelHandler,代码如下: 步骤9:执行Netty系统ChannelHandler和用户添加定制的ChannelHandler。ChannelPipeline根据网络事件的类型,调度并执行ChannelHandler,相关代码如下所示: public interface ChannelHandler { //////////////////////////////// // Handler life cycle methods // //////////////////////////////// /** * Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events. */ void handlerAdded(ChannelHandlerContext ctx) throws Exception; /** * Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events * anymore. */ void handlerRemoved(ChannelHandlerContext ctx) throws Exception; /////////////////////////////////// // Inbound event handler methods // /////////////////////////////////// /** * Gets called if a {@link Throwable} was thrown. */ void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; /** * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop} */ void channelRegistered(ChannelHandlerContext ctx) throws Exception; /** * The {@link Channel} of the {@link ChannelHandlerContext} is now active */ void channelActive(ChannelHandlerContext ctx) throws Exception; /** * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its * end of lifetime. */ void channelInactive(ChannelHandlerContext ctx) throws Exception; /** * Invoked when the current {@link Channel} has read a message from the peer. */ void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception; /** * Invoked when the last message read by the current read operation has been consumed by * {@link #channelRead(ChannelHandlerContext, Object)}. If {@link ChannelOption#AUTO_READ} is off, no further * attempt to read an inbound data from the current {@link Channel} will be made until * {@link ChannelHandlerContext#read()} is called. */ void channelReadComplete(ChannelHandlerContext ctx) throws Exception; /** * Gets called if an user event was triggered. */ void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception; /** * Gets called once the writable state of a {@link Channel} changed. You can check the state with * {@link Channel#isWritable()}. */ void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception; 。。。 } netty线程模型 一个NioEventLoop聚合了一个多路复用器Selector,因此可以处理成百上千的客户端连接,Netty的处理策略是每当有一个新的客户端接入,则从NioEventLoop线程组中顺序获取一个可用的NioEventLoop,当到达数组上限之后,重新返回到0,通过这种方式,可以基本保证各个NioEventLoop的负载均衡。一个客户端连接只注册到一个NioEventLoop上,这样就避免了多个IO线程去并发操作它。 @Override protected int doReadMessages(List buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null) { buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; } Netty通过串行化设计理念降低了用户的开发难度,提升了处理性能。利用线程组实现了多个串行化线程水平并行执行,线程之间并没有交集,这样既可以充分利用多核提升并行处理能力,同时避免了线程上下文的切换和并发保护带来的额外性能损耗。 Netty是个异步高性能的NIO框架,它并不是个业务运行容器,因此它不需要也不应该提供业务容器和业务线程。合理的设计模式是Netty只负责提供和管理NIO线程,其它的业务层线程模型由用户自己集成,Netty不应该提供此类功能,只要将分层划分清楚,就会更有利于用户集成和扩展。 令人遗憾的是在Netty 3系列版本中,Netty提供了类似Mina异步Filter的ExecutionHandler,它聚合了JDK的线程池java.util.concurrent.Executor,用户异步执行后续的Handler。 ExecutionHandler是为了解决部分用户Handler可能存在执行时间不确定而导致IO线程被意外阻塞或者挂住,从需求合理性角度分析这类需求本身是合理的,但是Netty提供该功能却并不合适。原因总结如下: 1. 它打破了Netty坚持的串行化设计理念,在消息的接收和处理过程中发生了线程切换并引入新的线程池,打破了自身架构坚守的设计原则,实际是一种架构妥协; 2. 潜在的线程并发安全问题,如果异步Handler也操作它前面的用户Handler,而用户Handler又没有进行线程安全保护,这就会导致隐蔽和致命的线程安全问题; 3. 用户开发的复杂性,引入ExecutionHandler,打破了原来的ChannelPipeline串行执行模式,用户需要理解Netty底层的实现细节,关心线程安全等问题,这会导致得不偿失。 鉴于上述原因,Netty的后续版本彻底删除了ExecutionHandler,而且也没有提供类似的相关功能类,把精力聚焦在Netty的IO线程NioEventLoop上,这无疑是一种巨大的进步,Netty重新开始聚焦在IO线程本身,而不是提供用户相关的业务线程模型。 最近做netty项目是否发现,多个handler处理过滤的时候。使用channelHandlerContext.writeAndFlush(packet)发送信息的时候发现,这个handler中的 write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 没有被执行,最后研究发现 Netty线程开发最佳实践 时间可控的简单业务直接在IO线程上处理 如果业务非常简单,执行时间非常短,不需要与外部网元交互、访问数据库和磁盘,不需要等待其它资源,则建议直接在业务ChannelHandler中执行,不需要再启业务的线程或者线程池。避免线程上下文切换,也不存在线程并发问题。 复杂和时间不可控业务建议投递到后端业务线程池统一处理 对于此类业务,不建议直接在业务ChannelHandler中启动线程或者线程池处理,建议将不同的业务统一封装成Task,统一投递到后端的业务线程池中进行处理。过多的业务ChannelHandler会带来开发效率和可维护性问题,不要把Netty当作业务容器,对于大多数复杂的业务产品,仍然需要集成或者开发自己的业务容器,做好和Netty的架构分层。 业务线程避免直接操作ChannelHandler 对于ChannelHandler,IO线程和业务线程都可能会操作,因为业务通常是多线程模型,这样就会存在多线程操作ChannelHandler。为了尽量避免多线程并发问题,建议按照Netty自身的做法,通过将操作封装成独立的Task由NioEventLoop统一执行,而不是业务线程直接操作 故障定制 在大多数场景下,当底层网络发生故障的时候,应该由底层的NIO框架负责释放资源,处理异常等。上层的业务应用不需要关心底层的处理细节。但是,在一些特殊的场景下,用户可能需要感知这些异常,并针对这些异常进行定制处理,例如: 1) 客户端的断连重连机制; 2) 消息的缓存重发; 3) 接口日志中详细记录故障细节; 4) 运维相关功能,例如告警、触发邮件/短信等 Netty的处理策略是发生IO异常,底层的资源由它负责释放,同时将异常堆栈信息以事件的形式通知给上层用户,由用户对异常进行定制。这种处理机制既保证了异常处理的安全性,也向上层提供了灵活的定制能力。 不会丢消息 是这样子的 在前台 主进程在的时候就正常收取消息 长连接 主进程 挂了 比如 推出程序 就是 push进程在后台运行 来了消息 有push 消息通知 如果这两个情况都不满足 也不会丢消息 别人给你发了消息 这个时候消息在服务器 保存着 等你下次再上的时候 connect 会服务器 拉取 这个消息 叫离线消息 总的来说 通过上述三种方式 我们可以确保不会丢包丢消息 测试工具 VisualVM是一个集成多个JDK命令行工具的可视化工具。可以作为Java应用程序性能分析和运行监控的工具。开发人员可以利用它来监控、分析线程信息,浏览内存堆数据。系统管理员可以利用它来监测、控制Java应用程序横跨整个网络的情况。Java应用程序使用人员可以利用它来创建包含所有必要信息的Bug 报告。 实验测试 结束语 参考文献 http://ifeve.com/netty-mina-in-depth-1/ http://www.infoq.com/cn/articles/netty-high-performance/ http://www.infoq.com/cn/articles/netty-million-level-push-service-design-points/ http://www.coderli.com/netty-5-new-and-noteworthy/ http://www.infoq.com/cn/articles/netty-server-create |