博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
第二章 NIO入门
阅读量:4949 次
发布时间:2019-06-11

本文共 32927 字,大约阅读时间需要 109 分钟。

  • 传统的同步阻塞式I/O编程
  • 基于NIO的非阻塞编程
  • 基于NIO2.0的异步非阻塞(AIO)编程
  • 为什么要使用NIO编程
  • 为什么选择Netty

  第二章 NIO 入门

    2.1 传统的BIO编程

      2.1.1 BIO 通信模型图

      2.1.2 同步阻塞式I/O创建的TimeServer源码分析

package com.phei.netty.bio;import java.io.IOException;import java.net.ServerSocket;import java.net.Socket;public class TimeServer {    public static void main(String[] args)throws IOException{        int port = 8080;        if(args != null && args.length > 0){            try{                port = Integer.valueOf(port);            }catch(NumberFormatException e){//                port = 8080;            }        }        ServerSocket server = null;        try{//            如果端口合法且没有被占用,服务端监听成功            server = new ServerSocket(port);            System.out.println("The time server is start in port:" + port);            Socket socket = null;            while(true){//                如果没有客户端接入,则主线程阻塞在ServerSocket的accept操作上                socket = server.accept();                new Thread(new TimeServerHandler(socket)).start();            }        }finally{            if(server != null){                System.out.println("The time server close");                server.close();                server = null;            }        }    }}
package com.phei.netty.bio;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.io.PrintWriter;import java.net.Socket;import java.util.Date;public class TimeServerHandler implements Runnable {    private Socket socket;        public TimeServerHandler() {        // TODO Auto-generated constructor stub    }        public TimeServerHandler(Socket socket) {        super();        this.socket = socket;    }    @Override    public void run() {        BufferedReader in = null;        PrintWriter out = null;        try{//            输入流,获取客户端输出流信息            in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));//            输出流,放到客户端输入流中            out = new PrintWriter(this.socket.getOutputStream(),true);            String currentTime = null;            String body = null;            while(true){//                获取客户端输出的信息                body = in.readLine();                if(body == null){                    break;                }                System.out.println("The time server receive order : " + body);                currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER";//                发送信息到客户端输入流中                out.println(currentTime);            }        }catch(Exception e){            if(in != null){                try{                    in.close();                }catch(IOException e1){                    e1.printStackTrace();                }            }            if(out != null){                out.close();                out = null;            }            if(this.socket != null){                try{                    this.socket.close();                }catch(IOException e1){                    e1.printStackTrace();                }                this.socket = null;            }        }    }}

      2.1.3 同步阻塞式I/O创建的TimeClient源码分析

package com.phei.netty.bio;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.io.PrintWriter;import java.net.Socket;public class TimeClient {    public static void main(String[] args){        int port = 8080;        if(args != null && args.length > 0){            try{                port = Integer.valueOf(args[0]);            }catch(NumberFormatException e){                            }        }        Socket socket = null;        BufferedReader in = null;        PrintWriter out = null;        try{            socket = new Socket("127.0.0.1",port);//            输入流,获取服务端输出流信息            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));//            输出流,放到服务端输入流中            out = new PrintWriter(socket.getOutputStream(),true);//            发送信息到服务端            out.println("QUERY TIME ORDER");            System.out.println("Send order 2 server succeed.");//            读取输入流的信息            String resp = in.readLine();            System.out.println("Now is : " + resp);        }catch(Exception e){                    }finally{            if(out != null){                out.close();                out = null;            }            if(in != null){                try{                    in.close();                }catch(IOException e){                    e.printStackTrace();                }                in = null;            }            if(socket != null){                try{                    socket.close();                }catch(IOException e){                    e.printStackTrace();                }                socket = null;            }        }    }}

 

    2.2 伪异步I/O编程

      2.2.1 伪异步I/O模型图

      2.2.2 伪异步I/O创建的TimeServer源码分析

package com.phei.netty.pio;import java.io.IOException;import java.net.ServerSocket;import java.net.Socket;import com.phei.netty.bio.TimeServerHandler;public class TimeServer {    public static void main(String[] args) throws IOException{        int port = 8080;        if(args != null && args.length > 0){            try{                port = Integer.valueOf(args[0]);            }catch(NumberFormatException e){                            }        }        ServerSocket server = null;        try{            server = new ServerSocket(port);            System.out.println("The time server is start in port:" + port);            Socket socket = null;            //创建I/O任务线程池            TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool(50,10000);            while(true){                socket = server.accept();//                当接收到新的客户端连接时,将请求Socket封装成一个Task,然后调用线程池的execute方法执行,从而避免了每个请求接入都创建一个新的线程。                singleExecutor.execute(new TimeServerHandler(socket));            }        }finally{            if(server != null){                System.out.println("The time server close");                server.close();                server = null;            }        }    }}
package com.phei.netty.pio;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class TimeServerHandlerExecutePool {    private ExecutorService executor;        public TimeServerHandlerExecutePool(int maxPoolSize,int queueSize){        executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue
(queueSize)); } public void execute(Runnable task){ executor.execute(task); }}

      2.2.3 伪异步I/O弊端分析

    2.3 NIO编程

      2.3.1 NIO类库简介

        1.缓冲区Buffer

          ByteBuffer

          CharBuffer

          ShortBuffer

          IntBuffer

          LongBuffer

          FloatBuffer

          DoubleBuffer

        2.通道Channel

          网络读写:SelectableChannel

          文件操作:FileChannel

        3.多路复用器Selector

          多路复用器提供选择已经就绪的任务的能力:Selector回不断地轮询注册在其上的Channel,如果某个Channel上面发生读或者写时间,这个Channel就处                               于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。

      2.3.2 NIO服务端序列图

      2.3.3 NIO创建的TimeServer源码分析

package com.phei.netty.nio;public class TimeServer {    public static void main(String[] args){        int port = 8080;        if(args != null && args.length > 0){            try{                port = Integer.valueOf(args[0]);            }catch(NumberFormatException e){                            }        }//        创建多路复用类MultiplexerTimeServer。它是一个独立的线程,负责轮询多路复用器Selctor,可以处理多个客户端的并发接入。        MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);        new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();    }}
package com.phei.netty.nio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Date;import java.util.Iterator;import java.util.Set;public class MultiplexerTimeServer implements Runnable {    private Selector selector;    private ServerSocketChannel servChannel;    private volatile boolean stop;        /*     * 初始化多路复用器,绑定监听端口     * 在构造方法中创建多路复用器Selector、ServerSocketChannel,对Channel和TCP参数进行配置。     * 将ServlerSocketChannel设置为异步非阻塞模式,它的backlog设为1024。     * 系统资源初始化成功后,将ServerSocketChannel注册到Selector,监听SelectionKey.OP_ACCEPT操作位。     * 如果资源初始化失败,则退出。     */    public MultiplexerTimeServer(int port){        try{            //Opens a selector.            selector = Selector.open();            //Opens a server-socket channel.            servChannel = ServerSocketChannel.open();//            Adjusts this channel's non-blocking mode.            servChannel.configureBlocking(false);//            Retrieves a server socket associated with this channel.//            Binds the ServerSocket to a specific address (IP address and port number).//            1024 : requested maximum length of the queue of incoming connections.            servChannel.socket().bind(new InetSocketAddress(port),1024);//            Registers this channel with the given selector, returning a selection key.//            SelectionKey.OP_ACCEPT : Operation-set bit for socket-accept operations.            servChannel.register(selector,SelectionKey.OP_ACCEPT);            System.out.println("The time server is start in port:" + port);        }catch(IOException e){            e.printStackTrace();            System.exit(1);        }    }        public void stop(){        this.stop = true;    }        /**     * 在线程的run方法的while循环体中循环遍历selector,它的休眠时间为1s。     * 无论是否有读写等事件发生,selector每隔1s都被唤醒一次。     * selector也提供了一个无参的select方法:当有处于就绪状态的Channel时,selector将返回该Channel的SelectionKey集合。     * 通过对就绪状态的Channel集合进行迭代,可以进行网络的异步读写操作。     */    @Override    public void run() {        while(!stop){            try{//                Selects a set of keys whose corresponding channels are ready for I/O operations.//                timeout - If positive, block for up to timeout milliseconds, more or less, while waiting for a channel to become ready; //          if zero, block indefinitely; must not be negative                selector.select(1000);//                Returns this selector's selected-key set.                Set
selectedKeys = selector.selectedKeys(); Iterator
it = selectedKeys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try{ handleInput(key); }catch(Exception e){ if(key != null){// Requests that the registration of this key's channel with its selector be cancelled. key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } }catch (Throwable t) { t.printStackTrace(); } }// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源 if(selector != null){ try{ selector.close(); }catch(IOException e){ e.printStackTrace(); } } } /* * 处理新接入的客户端请求信息,根据SelectionKey的操作位进行判断即可获知网络时间的类型, * 通过ServerSocketChannel的accept接收客户端的连接请求并创建SocketChannel实例。 * 完成之后相当于完成了TCP的三次握手,TCP物理链路正是建立。 * 需要将新创建的SocketChannel设置为异步非阻塞,同时也可以对其TCP参数进行设置,如TCP接收和发送缓冲区的大小等。 */ private void handleInput(SelectionKey key) throws IOException{// Tells whether or not this key is valid. if(key.isValid()){// Tests whether this key's channel is ready to accept a new socket connection.// 处理新接入的请求消息 if(key.isAcceptable()){// Returns the channel for which this key was created. ServerSocketChannel ssc = (ServerSocketChannel) key.channel();// Accepts a connection made to this channel's socket. SocketChannel sc = ssc.accept();// Adjusts this channel's blocking mode. sc.configureBlocking(false);// Registers this channel with the given selector, returning a selection key.// The interest set for the resulting key sc.register(selector, SelectionKey.OP_READ); } /* * 用于读取客户端的请求消息。 * 首先创建一个ByteBuffer,由于事先无法得知客户端发送的码流大小,作为历程,开辟一个1MB的缓冲区。 * 然后调用SocketChannel的read方法读取请求码流。 */// Tests whether this key's channel is ready for reading. if(key.isReadable()){ SocketChannel sc = (SocketChannel) key.channel();// Allocates a new byte buffer. ByteBuffer readBuffer = ByteBuffer.allocate(1024);// Reads a sequence of bytes from this channel into the given buffer. int readBytes =sc.read(readBuffer); if(readBytes > 0){// 对readBuffer进行flip操作,将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作。// Flips this buffer. readBuffer.flip();// 根据缓冲区刻度的字节个数创建字节数组// Returns the number of elements between the current position and the limit. byte[] bytes = new byte[readBuffer.remaining()];// 调用ByteBuffer的get操作将缓冲区可读字节数组复制到新创建的字节数组中// Relative bulk get method. readBuffer.get(bytes); String body = new String(bytes,"UTF-8"); System.out.println("The time server receive order : " + body); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER"; doWrite(sc,currentTime); }else if(readBytes < 0){// 对端链路关闭 key.cancel(); sc.close(); }else{// 读到0字节,忽略 ; } } } } /* * 将应答消息异步发送给客户端。 * 由于SocketChannel是异步非阻塞的,它并不保证一次能够把需要发送的字节数组发送完,此时会出现“写半包”问题。 * 需要注册写操作,不断轮询Selector将没有发送完的ByteBuffer发送完毕, * 然后可以通过ByteBuffer的hasRemain()方法判断消息是否发送完成。 */ private void doWrite(SocketChannel channel,String response) throws IOException{ if(response != null && response.trim().length() > 0){ byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);// Relative bulk put method (optional operation). writeBuffer.put(bytes); writeBuffer.flip();// 调用SocketChannel的write方法将缓冲区中的字节数组发送出去// Writes a sequence of bytes to this channel from the given buffer. channel.write(writeBuffer); } }}

      2.3.4 NIO客户端序列图

      2.3.5 NIO创建的TimeClient源码分析

package com.phei.netty.nio;public class TimeClient {    public static void main(String[] args){        int port = 8080;        if(args != null && args.length > 0){            try{                port = Integer.valueOf(args[0]);            }catch(NumberFormatException e){                            }        }        new Thread(new TimeClientHandle("127.0.0.1",port),"TimeClient-001").start();    }}
package com.phei.netty.nio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;public class TimeClientHandle implements Runnable {    private String host;    private int port;    private Selector selector;    private SocketChannel socketChannel;    private volatile boolean stop;        public TimeClientHandle() {        super();        // TODO Auto-generated constructor stub    }    /*     * 使用构造函数初始化NIO的多路复用器和SocketChannel对象。     * 创建SocketChannel之后将其设置为异步非阻塞模式。     * 在此可以设置SocketChannel的TCP参数     */    public TimeClientHandle(String string, int port) {        this.host = host == null ? "127.0.0.1" : host;        this.port = port;        try{            selector = Selector.open();            socketChannel = SocketChannel.open();            socketChannel.configureBlocking(false);        }catch(IOException e){            e.printStackTrace();            System.exit(1);        }    }    /*     * 作为示例,连接是成功的,所以不需要做重连操作,因此将其放到循环之前。     */    @Override    public void run() {        try{//            如果连接成功、如果没有成功            doConnect();        }catch(IOException e){            e.printStackTrace();            System.exit(1);        }//        轮询多路复用器Selector。当有就绪的Channel时,执行handleInput(key)方法。        while(!stop){            try{                selector.select(1000);                Set
selectedKeys = selector.selectedKeys(); Iterator
it = selectedKeys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try{// 当有就绪的Channel时执行 handleInput(key); }catch(Exception e){ if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } }catch(Exception e){ e.printStackTrace(); System.exit(1); } }// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源 if(selector != null){ try{ selector.close(); }catch(IOException e){ e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws IOException{// Tells whether or not this key is valid. if(key.isValid()){// Returns the channel for which this key was created. SocketChannel sc = (SocketChannel) key.channel();// 判断是否连接成功// 如果处于连接状态,说明服务端已经返回ACK应答消息。// Tests whether this key's channel has either finished, or failed to finish, // its socket-connection operation. if(key.isConnectable()){// 调用SecoketChannel的finishConnect()方法。// 如果返回值为true,说明客户端连接成功;// 如果返回值为false或者抛出IOException,说明连接失败// Finishes the process of connecting a socket channel. if(sc.finishConnect()){// 将SocketChannel注册到多路复用器上,注册SelectionKey.OP_READ操作位,监听网络读操作,// 然后发送请求消息给服务端 sc.register(selector, SelectionKey.OP_READ);// 构造请求消息体,然后对其编码,写入到发送缓冲区中,最后调用SocketChannel的write方法进行发送。存在"半包写"// 最后通过hasRemaining()方法对发送结果进行判断 doWrite(sc); }else{ System.exit(1);//连接失败,进程退出 }       }// 测试此键的通道是否已准备好进行读取。 if(key.isReadable()){ ByteBuffer readBuffer = ByteBuffer.allocate(1024);// 将字节序列从此通道读入给定的缓冲区。 int readBytes = sc.read(readBuffer); if(readBytes > 0){// 反转此缓冲区。 readBuffer.flip();// 返回当前位置与限制之间的元素数。 byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes,"UTF-8"); System.out.println("Now is : " + body); this.stop = true; }else if(readBytes < 0){ //对端链路关闭 key.cancel(); sc.close(); }else{ ; //读到0字节,忽略 } } } } private void doConnect() throws IOException{// 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答// 连接此通道的套接字 if(socketChannel.connect(new InetSocketAddress(host, port))){// 连接成功,将SocketChannel注册到多路复用器Selector上,注册SelectionKey.OP_READ socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); }else{// 如果没有直接连接成功,则说明服务端没有返回TCP握手应答,但这并不代表连接失败。// 需要将SocketChannel注册到多路复用器Selector上,注册SelectionKey.OP_CONNECT,// 当服务端返回TCP syn-ack 消息后,Selector就能够轮询到这个SocketChannel处于连接就绪状态。 socketChannel.register(selector, SelectionKey.OP_CONNECT); } } private void doWrite(SocketChannel sc) throws IOException{ byte[] req = "QUERY TIME ORDER".getBytes();// 分配一个新的字节缓冲区。 ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip();// 将字节序列从给定的缓冲区中写入此通道。 sc.write(writeBuffer);// 告知在当前位置和限制之间是否有元素。 if(!writeBuffer.hasRemaining()){// 如果缓冲区中的消息全部发送完成,打印 System.out.println("Send order 2 server succeed."); } }}

     服务端控制台:

The time server is start in port:8080The time server receive order : QUERY TIME ORDER

     客户端控制台:

Send order 2 server succeed.

      socketChannel.connect(new InetSocketAddress(host, port):返回false

      key.isReadable():返回false

      WHY????????????????????????????

     2.4 AIO编程

      NIO 2.0 引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。

      异步通道提供一下两种方式获取操作结果:

        通过 java.util.concurrent.Future 类来表示异步操作的结果;

        在执行异步操作的时候传入一个 java.nio.channels.CompletionHandler接口的实现类作为操作完成的回调。

      NIO 2.0 的异步套接字通道是真正的异步非阻塞I/O,对应于UNIX网络编程中的事件驱动I/O(AIO)。它不需要通过多路复用器(Selector)对注册的通道进行轮询操作即可实现异步读写,从而简化了NIO的编程模型。

      2.4.1 AIO创建的TimeServer源码分析

package com.phei.netty.aio;public class TimeServer {    public static void main(String[] args){        int port = 8080;        if(args != null && args.length > 0){            try{                port = Integer.valueOf(args[0]);            }catch(NumberFormatException e){                //采用默认值            }        }//        创建异步的时间服务器处理类        AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port);//        启动线程        new Thread(timeServer,"AIO-AsyncTimeServerHandler-001").start();    }}
package com.phei.netty.aio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.AsynchronousServerSocketChannel;import java.util.concurrent.CountDownLatch;public class AsyncTimeServerHandler implements Runnable {    private int port;    CountDownLatch latch;    AsynchronousServerSocketChannel asynchronousServerSocketChannel;    //    创建一个异步的服务端通道AsynchronousServerSocketChannel,然后调用它的bind方法绑定监听端口。    public AsyncTimeServerHandler(int port){        this.port = port;        try{            asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();            asynchronousServerSocketChannel.bind(new InetSocketAddress(port));            System.out.println("The time server is start in port : " + port);        }catch(IOException e){            e.printStackTrace();        }    }    @Override    public void run() {//        在完成一组正在执行的操作之前,允许当前的线程一直阻塞。//        在本例中,我们让线程在此阻塞,防止服务端执行完成退出。//        在实际项目应用中,不需要启动独立的线程来处理AsynchronousServerSocketChannel,这里仅仅是个demo演示        latch = new CountDownLatch(1);//        用于接收客户端的连接,由于是异步操作,可以传递一个CompletionHandler
类型的handler实例接收accept操作成功的通知消息 doAccept(); try{ latch.await(); }catch(InterruptedException e){ e.printStackTrace(); } } private void doAccept() { asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler()); }}
package com.phei.netty.aio;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;public class AcceptCompletionHandler implements CompletionHandler
{ @Override public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {// 从attachment获取成员变量AsynchronousServerSocketChannel,然后继续调用她的accept方法// 调用AsynchronousServerSocketChannel的accept方法后,// 如果有新的客户端连接接入,系统将回调传入的CompletionHandler实例的completed方法,表示新的客户端已经接入成功。// 因为一个AsynchronousServerSocketChannel可以接收成千上万个客户端,所以需要继续调用它的accept方法,// 接收其他的客户端连接,最终形成一个循环。每当接收一个客户读连接成功之后,在异步连接新的客户端连接。 attachment.asynchronousServerSocketChannel.accept(attachment, this);// 链路建立成功之后,服务端需要接收客户端的请求消息。// 创建新的ByteBuffer,预分配1MB的缓冲区。 ByteBuffer buffer = ByteBuffer.allocate(1024);// 通过调用AsynchronousSocketChannel的read方法进行异步读操作。// ByteBuffer dst : 接收缓冲区,用于从异步Channel中读取数据包;// A attachment : 异步Channel携带的附件,通知回调的时候作为入参使用;// CompletionHandler
: 接收通知回调的业务Handler,在例程中为ReadCOmpletionHandler result.read(buffer,buffer,new ReadCompletionHandler(result)); } @Override public void failed(Throwable exc, AsyncTimeServerHandler attachment) { exc.printStackTrace(); attachment.latch.countDown(); }}
package com.phei.netty.aio;import java.io.IOException;import java.io.UnsupportedEncodingException;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import java.util.Date;public class ReadCompletionHandler implements CompletionHandler
{ private AsynchronousSocketChannel channel; // 将AsynchronousSocketChannel通过参数传递到ReadCompletionHandler中,// 当作成员变量来使用,主要用于读取半包消息和发送应答 public ReadCompletionHandler(AsynchronousSocketChannel channel) { if(this.channel == null){ this.channel = channel; } }// 读取到消息后的处理 @Override public void completed(Integer result, ByteBuffer attachment) {// 对attachment进行flip操作,为后续冲缓冲区读取数据做准备。 attachment.flip();// 根据缓冲区的刻度字节数创建byte数组 byte[] body = new byte[attachment.remaining()]; attachment.get(body); try{// 通过new String方法创建请求消息,对请求消息进行判断 String req = new String(body,"UTF-8"); System.out.print("The time server receive order : " + req); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";// 调用doWrite方法发送给客户端 doWriter(currentTime); }catch(UnsupportedEncodingException e){ e.printStackTrace(); } } private void doWriter(String currentTime) {// 对当前事件进行合法性校验 if(currentTime != null && currentTime.trim().length() > 0){// 调用字符串的解码方法将应答消息编码成字节数组,然后将它复制到发送缓冲区writeBuffer中 byte[] bytes = currentTime.getBytes(); ByteBuffer writerBuffer = ByteBuffer.allocate(bytes.length); writerBuffer.put(bytes); writerBuffer.flip();// 调用AsynchronousSocketChannel的异步write方法 channel.write(writerBuffer,writerBuffer,new CompletionHandler
() { @Override public void completed(Integer result, ByteBuffer buffer) {// 如果没有发送完成,继续发送,知道发送成功 if(buffer.hasRemaining()){ channel.write(buffer,buffer,this); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try{ channel.close(); }catch(IOException e){ //ingonre on close } } }); } }// 当发送异常的时候,对异常Throwable进行判断:如果I/O异常,就关闭链路,释放资源;// 如果是其他异常,按照业务自己的逻辑进行处理。本例程作为简单的demo,没有对异常进行分类判断,只要发生了读写异常,就关闭链路,释放资源。 @Override public void failed(Throwable exc, ByteBuffer attachment) { try{ this.channel.close(); }catch(IOException e){ e.printStackTrace(); } }}

      2.4.2 AIO创建的TimeClient源码分析

        Class :TimeClient

package com.phei.netty.aio;public class TimeClient {    public static void main(String[] args){        int port = 8080;        if(args != null && args.length > 0){            try{                port = Integer.valueOf(args[0]);            }catch(NumberFormatException e){                            }        }//        通过一个独立的I/O线程常见异步时间服务器客户端Handler。//        在实际项目中,我们不需要独立的线程创建异步连接对象,因为底层都是通过JDK的系统回调实现的,//        在后面运行时间服务器程序的时候,我们会抓取线程调用堆栈给大家展示        new Thread(new AsyncTimeClientHandler("127.0.0.1",port),"AIO-AsyncTimeClientHandler-001").start();    }}

        Class : AsyncTimeClientHandler

package com.phei.netty.aio;import java.io.IOException;import java.io.UnsupportedEncodingException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import java.util.concurrent.CountDownLatch;public class AsyncTimeClientHandler implements        CompletionHandler
, Runnable { private AsynchronousSocketChannel client; private String host; private int port; private CountDownLatch latch; public AsyncTimeClientHandler() { super(); } public AsyncTimeClientHandler(String host, int port) { super(); this.host = host; this.port = port; try{// 1.通过AsynchronousSocketChannel的open方法创建一个新的AsynchronousSocketChannel对象。 client = AsynchronousSocketChannel.open(); }catch(IOException e){ e.printStackTrace(); } } public void run() {// 2.创建CountDownLatch进行等待,防止异步操作没有执行完成线程就退出 latch = new CountDownLatch(1);// 3.通过connect方法发起异步操作// A attachment : AsynchronousSocketChannel 的附件,用于回调通知时作为入参被传递,调用者可自定义。// CompletionHandler
handler : 异步操作回调通知接口,由调用者实现// 在本例程中,这两个参数都使用AsyncTimeClientHandler类本身,因为它实现了CompletionHandler接口 client.connect(new InetSocketAddress(host,port),this,this); try{ latch.await(); }catch(InterruptedException e1){ e1.printStackTrace(); } try{ client.close(); }catch(IOException e){ e.printStackTrace(); } }// 4.异步连接成功之后的方法回调completed。 public void completed(Void result, AsyncTimeClientHandler attachment) {// 创建请求消息体,对其进行编码。 byte[] req ="QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);// 复制到发送缓冲区writeBuffer中, writeBuffer.put(req); writeBuffer.flip();// 调用AsynchronousSocketChannel的write方法进行异步写。// 实现CompletionHandler
接口用于写操作完成后的回调 client.write(writeBuffer,writeBuffer,new CompletionHandler
(){ public void completed(Integer result, ByteBuffer buffer) { if(buffer.hasRemaining()){ client.write(buffer,buffer,this); }else{ ByteBuffer readBuffer = ByteBuffer.allocate(1024);// 5.客户端异步读取时间服务器服务端应答消息的处理逻辑。// 调用AsynchronousSocketChannel的read方法异步读取服务端的响应消息// 由于read操作是异步的,所以我们通过内部匿名类实现CompletionHandler
接口,// 当读取完成被JDK回调时,构造应答消息。 client.read(readBuffer,readBuffer,new CompletionHandler
(){ public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try{ body = new String(bytes,"UTF-8"); System.out.println("Now is : " + body); latch.countDown(); }catch(UnsupportedEncodingException e){ e.printStackTrace(); } } public void failed(Throwable exc, ByteBuffer attachment) { try{ client.close(); latch.countDown(); }catch(IOException e){ // ingnore on close } } }); } }// 当读取发生异常时,关闭链路,同时调用CountDownLatch的countDown方法让AsyncTimeClientHandler线程执行完毕,客户端退出执行 public void failed(Throwable exc, ByteBuffer attachment) { try{ client.close(); latch.countDown(); }catch(IOException e){ //ingnore on close } } }); } public void failed(Throwable exc, AsyncTimeClientHandler attachment) { exc.printStackTrace(); try{ client.close(); latch.countDown(); }catch(IOException e){ e.printStackTrace(); } }}

      2.4.3 AIO版本时服务器的运行结果

    2.5 4中I/O的对比

      2.5.1 概念澄清

        1.异步非阻塞I/O

        2.多路复用器Selector

        3.伪异步I/O

      2.5.2 不同I/O模型对比

  同步阻塞I/O(BIO) 伪异步I/O 非阻塞I/O(NIO) 异步I/O(AIO)
客户端个数:I/O线程 1:1 M:N(其中M可以大于N) M:1(1个I/O线程处理多个客户端连接) M:0(不需要启动额外的I/O线程,被动回调)
I/O类型(阻塞) 阻塞I/O 阻塞I/O 非阻塞I/O 非阻塞I/O
I/O类型(同步) 同步I/O 同步I/O 同步I/O(I/O多路复用) 异步I/O
API使用难度 简单 简单 非常复杂 复杂
调试难度 简单 简单
可靠性 非常差
吞吐量

    2.6 选择Netty的理由

      2.6.1 不选择Java原生NIO编程的原因

      2.6.2 为什么选择Netty

    2.7 总结

    

  

转载于:https://www.cnblogs.com/ClassNotFoundException/p/6212107.html

你可能感兴趣的文章
hdfs 命令使用
查看>>
hdu 1709 The Balance
查看>>
prometheus配置
查看>>
定宽320 缩放适配手机屏幕
查看>>
BZOJ 2120 数颜色 【带修改莫队】
查看>>
【noip2004】虫食算——剪枝DFS
查看>>
Codeforces 40 E. Number Table
查看>>
CLR via C#(第3 版)
查看>>
java语法之final
查看>>
关于响应式布局
查看>>
详解ASP.Net 4中的aspnet_regsql.exe
查看>>
python 多进程和多线程的区别
查看>>
hdu1398
查看>>
[android] 网络断开的监听
查看>>
156.Binary Tree Upside Down
查看>>
MongoDB在windows下安装配置
查看>>
Upselling promotion stored procedure
查看>>
mysql编码配置
查看>>
KVM地址翻译流程及EPT页表的建立过程
查看>>
sigar
查看>>