2016-04-05 20:47:05.0|分类: netty|浏览量: 2202
客户端和服务器发送消息,首先需要建立长连接(会话)。有客户端向服务器发送会话请求,服务器收到申请后会检查该客户端是否为本服务器域内的,这个客户端的IP是否在黑名单中,如果客户端合法则开始会话,否则返回错误信息。 服务器端开启过程:监听指定端口号(例如8989),等待消息请求,然后进行处理。 // //获取当前系统的CPU数目*2 EventLoopGroup bossGroup = new NioEventLoopGroup(1); // bossGroup线程组实际就是Acceptor线程池,负责处理客户端的TCP连接请求,如果系统只有一个服务端端口需要监听, // 则建议bossGroup线程组线程数设置为1。 EventLoopGroup workerGroup = new NioEventLoopGroup(8); // workerGroup是真正负责I/O读写操作的线程组,通过ServerBootstrap的group方法进行设置,用于后续的Channel绑定。 try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channelFactory( new NettyServerChannelFactory<ServerChannel>( NioServerSocketChannel.class)) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) // .option(ChannelOption.SO_RCVBUF, 64) 20s读第一次数据 // .option(ChannelOption.SO_RCVBUF, 128) 30s // .option(ChannelOption.SO_RCVBUF, 16) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast( new NettyChannelHandlerAdapter()); } }); System.err.println(" 绑定端口>>" + SystemProperties.getSystemHost() + ":" + SystemProperties.getSystemPort()); // 绑定端口,同步等待成功 b.bind("127.0.0.1",8989).sync().channel() .closeFuture().sync(); // 等待服务端监听端口关闭 // channelFuture.channel().closeFuture() // channelFuture.channel().closeFuture().sync(); // channelFuture.channel().closeFuture().syncUninterruptibly(); } catch (Exception e) { e.printStackTrace(); } finally { // 优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } 会话模块:负责客户端与服务器之间连接的会话管理模块,主要功能是:建立连接,断开连接,接收字节流,发送字节流等。 netty提供了ChannelHandlerAdapter类,ChannelHandlerAdapter拦截某些事件,比如信息发送,信息接收,会话建立,会话断开等等动作。 自定义类NettyChannelHandlerAdapter集成ChannelHandlerAdapter类,然后实现一些方法channelRegistered(),channelInactive(),close() class NettyChannelHandlerAdapter extends ChannelHandlerAdapter { public void channelRegistered(ChannelHandlerContext ctx) throws Exception { if(SystemProperties.isLogError()){ log.error( " ctx.channel().id(): " + ctx.channel().id().toString()); } super.channelRegistered(ctx); allChannels.add( ctx.channel()); ... } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { if(SystemProperties.isLogError()){ log.error( " ctx.channel().id(): " + ctx.channel().id().toString()); } super.channelInactive(ctx); sendLog(ctx, Constant.SESSION_STATUS_CLOSE); closeClientConnection(ctx); allChannels.remove(ctx.channel()); } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise){ super.close(ctx, promise); closeClientConnection(ctx); allChannels.remove(ctx.channel()); } |