基于Netty的websocket简单实现 参考文献:Netty 4.x 用户指南
netty是个啥?其实netty就是一个将JDK原生的NIO封装好的一个框架,可以让你不需要关注JDK原生NIO的各种概念便可以开发出一个网络应用.
官方点的说法就是Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
Netty的优势
使用JDK自带的NIO需要了解太多的概念,编程复杂,一不小心bug横飞
Netty底层IO模型随意切换,而这一切只需要做微小的改动,改改参数,Netty可以直接从NIO模型变身为IO模型
Netty自带的拆包解包,异常检测等机制让你从NIO的繁重细节中脱离出来,让你只需要关心业务逻辑
Netty解决了JDK的很多包括空轮询在内的bug
Netty底层对线程,selector做了很多细小的优化,精心设计的reactor线程模型做到非常高效的并发处理
定制能力强,可以通过ChannelHandler对通信框架进行灵活地扩展
Netty社区活跃,遇到问题随时邮件列表或者issue
Netty已经历各大rpc框架,消息中间件,分布式通信中间件线上的广泛验证,健壮性无比强大
下面就是一个利用Netty来实现的网络聊天室
DEMO 引入依赖 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 <dependency > <groupId > io.netty</groupId > <artifactId > netty-all</artifactId > <version > 4.1.30.Final</version > </dependency > <dependency > <groupId > com.fasterxml.jackson.core</groupId > <artifactId > jackson-core</artifactId > <version > 2.7.0</version > </dependency > <dependency > <groupId > com.fasterxml.jackson.core</groupId > <artifactId > jackson-databind</artifactId > <version > 2.7.0</version > </dependency >
客户端处理类,用于处理合HTTP请求 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 public class HttpRequestHandler extends SimpleChannelInboundHandler <FullHttpRequest > { private final String wsUri; private static final File INDEX; static { URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation(); try { String path = location.toURI() + "WebsocketChatClient.html" ; path = !path.contains("file:" ) ? path : path.substring(5 ); INDEX = new File(path); } catch (URISyntaxException e) { throw new IllegalStateException("Unable to locate WebsocketChatClient.html" , e); } } public HttpRequestHandler (String wsUri) { this .wsUri = wsUri; } @Override public void channelRead0 (ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { if (wsUri.equalsIgnoreCase(request.getUri())) { ctx.fireChannelRead(request.retain()); } else { if (HttpHeaders.is100ContinueExpected(request)) { send100Continue(ctx); } RandomAccessFile file = new RandomAccessFile(INDEX, "r" ); HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK); response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8" ); boolean keepAlive = HttpHeaders.isKeepAlive(request); if (keepAlive) { response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length()); response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); } ctx.write(response); if (ctx.pipeline().get(SslHandler.class) == null ) { ctx.write(new DefaultFileRegion(file.getChannel(), 0 , file.length())); } else { ctx.write(new ChunkedNioFile(file.getChannel())); } ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); if (!keepAlive) { future.addListener(ChannelFutureListener.CLOSE); } file.close(); } } private static void send100Continue (ChannelHandlerContext ctx) { FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE); ctx.writeAndFlush(response); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel incoming = ctx.channel(); System.out.println("Client:" +incoming.remoteAddress()+"异常" ); cause.printStackTrace(); ctx.close(); } }
处理 WebSocket frame 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler <TextWebSocketFrame > { public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0 (ChannelHandlerContext ctx,TextWebSocketFrame msg) throws Exception { Channel incoming = ctx.channel(); for (Channel channel : channels) { if (channel != incoming){ channel.writeAndFlush(new TextWebSocketFrame("[" + incoming.remoteAddress() + "]" + msg.text())); } else { channel.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text() )); } } } @Override public void handlerAdded (ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 加入" )); channels.add(incoming); System.out.println("Client:" +incoming.remoteAddress() +"加入" ); } @Override public void handlerRemoved (ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 离开" )); System.out.println("Client:" +incoming.remoteAddress() +"离开" ); } @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("Client:" +incoming.remoteAddress()+"在线" ); } @Override public void channelInactive (ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("Client:" +incoming.remoteAddress()+"掉线" ); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel incoming = ctx.channel(); System.out.println("Client:" +incoming.remoteAddress()+"异常" ); cause.printStackTrace(); ctx.close(); } }
初始化ChannelPipeline给channel 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class WebsocketChatServerInitializer extends ChannelInitializer <SocketChannel > { @Override public void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(64 *1024 )); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new HttpRequestHandler("/ws" )); pipeline.addLast(new WebSocketServerProtocolHandler("/ws" )); pipeline.addLast(new TextWebSocketFrameHandler()); } }
编写主方法启动服务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public class WebsocketChatServer { private int port; public WebsocketChatServer (int port) { this .port = port; } public void run () throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new WebsocketChatServerInitializer()) .option(ChannelOption.SO_BACKLOG, 128 ) .childOption(ChannelOption.SO_KEEPALIVE, true ); System.out.println("WebsocketChatServer 启动了" ); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); System.out.println("WebsocketChatServer 关闭了" ); } } public static void main (String[] args) throws Exception { int port; if (args.length > 0 ) { port = Integer.parseInt(args[0 ]); } else { port = 8080 ; } new WebsocketChatServer(port).run(); } }
客户端 在程序的resources目录下新建WebsocketChatClient.html 页面来作为客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 <!DOCTYPE html> <html > <head > <meta charset ="UTF-8" > <title > WebSocket Chat</title > </head > <body > <script type ="text/javascript" > var socket; if (!window .WebSocket) { window .WebSocket = window .MozWebSocket; } if (window .WebSocket) { socket = new WebSocket("ws://127.0.0.1:8080/ws" ); socket.onmessage = function (event) { var ta = document .getElementById('responseText' ); ta.value = ta.value + '\n' + event.data }; socket.onopen = function (event) { var ta = document .getElementById('responseText' ); ta.value = "连接开启!" ; }; socket.onclose = function (event) { var ta = document .getElementById('responseText' ); ta.value = ta.value + "连接被关闭" ; }; } else { alert("你的浏览器不支持 WebSocket!" ); } function send (message) { if (!window .WebSocket) { return ; } if (socket.readyState == WebSocket.OPEN) { socket.send(message); } else { alert("连接没有开启." ); } } </script > <form onsubmit ="return false;" > <h3 > WebSocket 聊天室:</h3 > <textarea id ="responseText" style ="width: 500px; height: 300px;" > </textarea > <br > <input type ="text" name ="message" style ="width: 300px" value ="hehe" > <input type ="button" value ="发送消息" onclick ="send(this.form.message.value)" > <input type ="button" onclick ="javascript:document.getElementById('responseText').value=''" value ="清空聊天记录" > </form > <br > <br > </body > </html >
先运行 WebsocketChatServer,再打开多个浏览器页面实现多个客户端访问 http://localhost:8080
那么,你就可以看到一个基于Netty实现的一个简单的网络应用了.