2016-04-05 20:47:05.0|分类: netty|浏览量: 2723
|
客户端和服务器发送消息,首先需要建立长连接(会话)。有客户端向服务器发送会话请求,服务器收到申请后会检查该客户端是否为本服务器域内的,这个客户端的IP是否在黑名单中,如果客户端合法则开始会话,否则返回错误信息。 服务器端开启过程:监听指定端口号(例如8989),等待消息请求,然后进行处理。 // //获取当前系统的CPU数目*2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// bossGroup线程组实际就是Acceptor线程池,负责处理客户端的TCP连接请求,如果系统只有一个服务端端口需要监听,
// 则建议bossGroup线程组线程数设置为1。
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
// workerGroup是真正负责I/O读写操作的线程组,通过ServerBootstrap的group方法进行设置,用于后续的Channel绑定。
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channelFactory(
new NettyServerChannelFactory<ServerChannel>(
NioServerSocketChannel.class))
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
// .option(ChannelOption.SO_RCVBUF, 64) 20s读第一次数据
// .option(ChannelOption.SO_RCVBUF, 128) 30s
// .option(ChannelOption.SO_RCVBUF, 16)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel)
throws Exception {
socketChannel.pipeline().addLast(
new NettyChannelHandlerAdapter());
}
});
System.err.println(" 绑定端口>>" + SystemProperties.getSystemHost()
+ ":" + SystemProperties.getSystemPort());
// 绑定端口,同步等待成功
b.bind("127.0.0.1",8989).sync().channel()
.closeFuture().sync();
// 等待服务端监听端口关闭
// channelFuture.channel().closeFuture()
// channelFuture.channel().closeFuture().sync();
// channelFuture.channel().closeFuture().syncUninterruptibly();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}会话模块:负责客户端与服务器之间连接的会话管理模块,主要功能是:建立连接,断开连接,接收字节流,发送字节流等。
netty提供了ChannelHandlerAdapter类,ChannelHandlerAdapter拦截某些事件,比如信息发送,信息接收,会话建立,会话断开等等动作。 自定义类NettyChannelHandlerAdapter集成ChannelHandlerAdapter类,然后实现一些方法channelRegistered(),channelInactive(),close() class NettyChannelHandlerAdapter extends ChannelHandlerAdapter {
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if(SystemProperties.isLogError()){
log.error( " ctx.channel().id(): " + ctx.channel().id().toString());
}
super.channelRegistered(ctx);
allChannels.add( ctx.channel());
...
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if(SystemProperties.isLogError()){
log.error( " ctx.channel().id(): " + ctx.channel().id().toString());
}
super.channelInactive(ctx);
sendLog(ctx, Constant.SESSION_STATUS_CLOSE);
closeClientConnection(ctx);
allChannels.remove(ctx.channel());
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise){
super.close(ctx, promise);
closeClientConnection(ctx);
allChannels.remove(ctx.channel());
} |
