探秘Netty2:Netty与Java NIO

 

Netty与Java NIO的渊源

netty是什么

Netty是一个基于Java NIO的client-server网络服务框架,人们可以利用netty快速地开发网络应用。同时netty相对于其他网络框架更加简单并且扩展性更强,这主要得益于其提供的简单易用的api将业务逻辑和网络处理代码解耦开来。能够使你更加专注于业务的实现而不需要太多关心网络底层实现。

异步设计

netty所有的api都是异步的。异步处理已经不是什么新鲜事了,众所周知,IO已经变为一个应用的瓶颈,而异步处理正是为了解决这个问题出现的。

CallBacks机制

CallBacks机制经常应用于异步处理,人们可以指定方法执行完后的回调函数,在JavaScript中,回调机制是其语言的核心。下面代码展示了如何利用回调机制处理接受数据。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public  interface  Fetcher { void fetchData(FetchCallback callback); } public  interface  FetchCallback { void onData(Data data); void onError(Throwable cause); } public  class  Worker { public void doWork() { Fetcher fetcher = … fetcher.fetchData( new FetchCallback() { @Override public void onData(Data data) { #1 System.out.println( “Data received: “ + data); } @Override public void onError(Throwable cause) { #2 System.err.println( “An error accour: “ + cause.getMessage()); } }); … } }

#1 没有出现错误,调用onData

#2 出现错误信息,调用onError

你可以将回调函数从当前线程移植到其他线程,但是并不能保证回调函数被执行。当你将多个异步回调函数串起来的时候会形成spaghetti code(管式代码),有些人认为这样的代码很难读,但JavaScript以及Node.js都是这种风格。

Futures机制

异步处理使用的第二个机制是Future机制。一个Future对象只有在特定情况下才会有值,Future对象要么是调用者的返回结果,要么是一个异常。Java在java.util.concurrent包中提供了供其线程池机制使用的Future接口,例如当你使用ExecutorService.submit()提交一个Runable任务时,就可以返回一个Future对象,利用Future对象可以判断该任务是否完成。如下所示:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 ExecutorService executor = Executors.newCachedThreadPool(); Runnable task1 =  new Runnable() { @Override public void run() { doSomeHeavyWork(); } … } Callable<Interger> task2 =  new Callable() { @Override public Integer call() { return doSomeHeavyWorkWithResul(); } … } Future<gt; future1 = executor.submit(task1); Future<Integer> future2 = executor.submit(task2); while (!future1.isDone() || !future2.isDone()) { … // do something else … }

CallBacks和Future是异步处理中最常用的两种机制,实际上无法判断两种机制的优劣,而Netty则会两种都提供,你可以自由选择使用哪种机制。

JVM中的阻塞与非阻塞比较

随着web应用的持续增长,如何提升网络应用的效率变得尤为重要。幸运的是从1.4版本开始,java提供了NIO API来供我们编写更有效率的网络应用。Java 7中又引入的NIO.2不仅仅是之前api的升级,同时也允许我们更加高效方便地编写异步代码。

New or non-blocking/p>

The N in NIO is typically thought to mean non-blocking rather than new.NIO has beenaround for so long now that nobody calls it new IO anymore. Most people refer to it as non-blocking IO

/p>

阻塞IO

 

上图所示为典型的阻塞IO模式,一个线程处理一个网络连接,因此应用能够处理连接的个数是由JVM上允许建立的线程个数决定的。

非阻塞IO

再来看下非阻塞IO模式,上图运用selector机制来处理多个连接。下面通过一个回显服务器示例来讲解非阻塞及阻塞IO的区别。

 

阻塞IO

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public  class  PlainEchoServer { public void serve(int port) throws IOException { final ServerSocket socket = new ServerSocket(port); #1 try { while ( true) { final Socket clientSocket = socket.accept(); #2 System.out.println( “Accepted connection from “ + clientSocket); new Thread(new Runnable() { #3   @Override public void run() { try { BufferedReader reader =  new BufferedReader( new InputStreamReader(clientSocket.getInputStream())); PrintWriter writer =  new PrintWriter(clientSocket .getOutputStream(),  true); while (true) { #4 writer.println(reader.readLine()); writer.flush(); } }  catch (IOException e) { e.printStackTrace(); try { clientSocket.close(); }  catch (IOException ex) { // ignore on close } } } }).start(); #5 } }  catch (IOException e) { e.printStackTrace(); } } }

# 1 绑定监听端口

# 2 阻塞至有新连接进来

# 3 新建线程用来处理客户端连接

# 4 从客户端读取数据并回写

# 5 启动线程

上述服务器代码要求每次连接进来一个请求就需要创建一个新的线程,即使使用线程池也仅能解决一时问题,不能再根本上解决问题:客户端的连接数取决于后台处理线程的个数。当连接数多时则会带来大问题。

非阻塞IO

在介绍NIO之前,我们先了解一些NIO的基本知识

BYTEBUFFER

ByteBuffer在Netty中即为重要,其主要是用来缓存数据的。ByteBuffer既可以分配到堆内存中也可以分配到堆外内存。一般来说,堆外内存能够更加快速地传递给channel,但分配和释放会更耗时。新旧的NIO API对ByteBuffer提供了统一的管理。ByteBuffer能够实现无拷贝地在各个实例之间共享,同时允许对可见数据进行切片和其他操作处理。

Slicing

Slicing a ByteBuffer allows to create a new ByteBuffer that share the same data as the intialByteBuffer but only expose a sub-region of it. This is useful to minimize memory copies whilestill only allow access to a part of the data

ByteBuffer有以下几个重要的操作

  • 将数据写进ByteBuffer
  • 调用ByteBuffer.flip()切换到读模式
  • 从ByteBuffer中读取数据
  • 调用ByteBuffer.clear()或者ByteBuffer.compact()来整理ByteBuffer内存

当往ByteBuffer中写数据时,ByteBuffer会通过更新buffer中write index的位置来跟踪buffer中的数据(也可以手动更新)。当需要从ByteBuffer中读取数据时,需要调用flip()来切换到读模式,flip()会将buffer的读起始位置设置为0,这样就可以读取buffer中所有数据了。

为了能够再次向ByteBuffer中写数据,可以将buffer模式切换到写模式并调用任意下列两个方法。

  • ByteBuffer.clear():清除ByteBuffer
  • ByteBuffer.compact():通过内存拷贝清除已经读过的数据

ByteBuffer.compact()会将所有未读的数据拷贝到buffer的起始位置。如下所示为ByteBuffer的使用示例

1 2 3 4 5 6 7 8 9 10 11 12 13 14 Channel inChannel = ….; ByteBuffer buf = ByteBuffer.allocate( 48); int bytesRead = – 1; do { bytesRead = inChannel.read(buf); #1 if (bytesRead != – 1) { buf.flip(); #2 while(buf.hasRemaining()){ System.out.print((char) buf.get()); #3 } buf.clear(); #4 } }  while (bytesRead != – 1); inChannel.close();

#1 从channel中读取数据到ByteBuffer

#2 切换模式至读模式

#3 读取buffer中的数据,每次调用一个get()会将buffer当前位置更新+1

#4 切换buffer至写模式,使其可以重新写

使用Selector模式

Selector可以监听多个IO是否可以读/写,这样一个Selector就可以用来处理多个连接,相比于阻塞IO每个连接占用一个线程,Selector模式更加高效。

通过以下几个操作就可以轻松运用Selector

  1. 在channels上创建一个或多个Selector
  2. 在channel上注册需要监听的事件,目前支持四种事件
    • OP_ACCEPT:socket-accept事件
    • OP_CONNECT:socket-connect事件
    • OP_READ:可读事件
    • OP_WRITE:可写事件
  3. channel注册后,调用Selector.select()方法阻塞直到上述注册的一个事件发生
  4. 当Selector.select()返回时,可以通过SelectionKey实例获取所有可操作的事件

下面EchoServer是基于非阻塞Selector的服务器代码,运用这个版本的Server可以运用一个线程处理上千个连接。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public  class  PlainNioEchoServer { public void serve(int port) throws IOException { System.out.println( “Listening for connections on port “ + port); ServerSocketChannel serverChannel = ServerSocketChannel.open(); ServerSocket ss = serverChannel.socket(); InetSocketAddress address =  new InetSocketAddress(port); ss.bind(address); #1 serverChannel.configureBlocking( false); Selector selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); #2 while ( true) { try { selector.select(); #3 }  catch (IOException ex) { ex.printStackTrace(); // handle in a proper way break; } Set readyKeys = selector.selectedKeys(); #4 Iterator iterator = readyKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = (SelectionKey) iterator.next(); iterator.remove(); #5 try { if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel)key.channel(); SocketChannel client = server.accept(); #6 System.out.println( “Accepted connection from” + client); client.configureBlocking( false); client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, ByteBuffer.allocate(100)); #7 } if (key.isReadable()) { #8 SocketChannel client = (SocketChannel) key.channel(); ByteBuffer output = (ByteBuffer) key.attachment(); client.read(output); #9 } if (key.isWritable()) { #10 SocketChannel client = (SocketChannel) key.channel(); ByteBuffer output = (ByteBuffer) key.attachment(); output.flip(); client.write(output); #11 output.compact(); } }  catch (IOException ex) { key.cancel(); try { key.channel().close(); }  catch (IOException cex) { } } } } } }

#1 绑定Server的port

#2 注册channel的OP_ACCEPT Selector事件,监听新连接

#3 阻塞直到有新的连接事件到来

#4 获取所有可操作的SelectedKey实例

#5 遍历SelectedKey实例,将遍历过的去除

#6 获取新的连接

#7 将新的连接注册到Selector中,并监听读/写事件

#8 检查SelectKey是否可读

#9 读数据

#10 检测是否可写

#11 写数据

上述代码实现起来比较繁琐,新的NIO API去掉了大部分繁琐的过程,使实现起来更加简单明了

基于NIO.2的EchoServer

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 public  class  PlainNio2EchoServer { public void serve(int port) throws IOException { System.out.println( “Listening for connections on port “ + port); final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(); InetSocketAddress address =  new InetSocketAddress(port); serverChannel.bind(address); #1 final CountDownLatch latch =  new CountDownLatch( 1); serverChannel.accept( nullnew CompletionHandler<AsynchronousSocketChannel, Object>() { #2   @Override public void completed(final AsynchronousSocketChannel channel, Object attachment) { serverChannel.accept(null, this); #3 ByteBuffer buffer = ByteBuffer.allocate( 100); channel.read(buffer, buffer, new EchoCompletionHandler(channel)); #4 @Override public void failed (Throwable throwable, Object attachment){ try { serverChannel.close(); #5 }  catch (IOException e) { // ingnore on closefinally { latch.countDown(); } } });  try   { latch.await(); }  catch( InterruptedException e)   { Thread.currentThread().interrupt(); } } private  final  class  EchoCompletionHandler  implements CompletionHandler< IntegerByteBuffer> { private  final AsynchronousSocketChannel channel;   EchoCompletionHandler(AsynchronousSocketChannel channel) { this.channel = channel; }   @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); channel.write(buffer, buffer, new CompletionHandler<Integer, #6 ByteBuffer>() {   @Override public void completed(Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) { channel.write(buffer, buffer, this); #7 }  else { buffer.compact(); channel.read(buffer, buffer, EchoCompletionHandler.this); #8 } }   @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); }  catch (IOException e) { 来源:黄小斜

声明:本站部分文章及图片转载于互联网,内容版权归原作者所有,如本站任何资料有侵权请您尽早请联系jinwei@zod.com.cn进行处理,非常感谢!

上一篇 2018年5月17日
下一篇 2018年5月17日

相关推荐