这篇文章介绍了NIO的基本概念:
Java NIO提供了与标准IO不同的IO工作方式:
-
- Channels and Buffers(通道和缓冲区):标准的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。
- Asynchronous IO(异步IO):Java NIO可以让你异步的使用IO,例如:当线程从通道读取数据到缓冲区时,线程还是可以进行其他事情。当数据被写入到缓冲区时,线程可以继续处理它。从缓冲区写入通道也类似。
- Selectors(选择器):Java NIO引入了选择器的概念,选择器用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个的线程可以监听多个数据通道。
基于这篇文章
写了一个NIO Server和 Client的代码。其中也有一些需要注意的地方。
首先是在代码里面有一些写就绪的情况,这种情况有一些特殊:
一般来说,你不应该注册写事件。写操作的就绪条件为底层缓冲区有空闲空间,而写缓冲区绝大部分时间都是有空闲空间的,所以当你注册写事件后,写操作一直是就绪的,选择处理线程全占用整个CPU资源。所以,只有当你确实有数据要写时再注册写操作,并在写完以后马上取消注册。
下面代码里面可以看到,有一个处理写就绪的函数,是使用了SelectionKey的attachment来处理,并且根据attachment是否仍有数据,来选择使用 interestOps与否。查了一些资料,可能是因为intestOps不会清空attachment吧,需要再研究。
下面是server的代码:
package com.myapp.nio;import org.apache.log4j.LogManager;import org.apache.log4j.Logger;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;/** * Created by baidu on 16/11/17. */public class NioServer extends Thread{ private static final Logger logger = LogManager.getLogger(NioServer.class); private InetSocketAddress inetSocketAddress; private Handler handler = new ServerHandler(); public NioServer(String hostname, int port) { inetSocketAddress = new InetSocketAddress(hostname, port); } // 用Override校验继承合法性 @Override public void run() { try { Selector selector = Selector.open(); // 打开选择器 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打开通道 serverSocketChannel.configureBlocking(false); // 非阻塞 serverSocketChannel.socket().bind(inetSocketAddress); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); logger.info("Server: socket server stated on port " + inetSocketAddress.getPort()); while(true) { int nKeys = selector.select(); if (nKeys > 0) { SetselectionKeySet = selector.selectedKeys(); Iterator iterator = selectionKeySet.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); // 以下两种写法是等价的 //if ((selectionKey.readyOps() & SelectionKey.OP_ACCEPT) != 0) { if (selectionKey.isAcceptable()) { logger.info("Server: accepted"); handler.handleAccept(selectionKey); } //else if ((selectionKey.readyOps() & SelectionKey.OP_READ) != 0) { else if (selectionKey.isReadable()) { logger.info("Server: readable"); handler.handleRead(selectionKey); } //else if ((selectionKey.readyOps() & SelectionKey.OP_WRITE) != 0) { else if (selectionKey.isWritable()) { logger.info("Server: writable"); handler.handleWrite(selectionKey); } // Is below necessary? iterator.remove(); } } } } catch (IOException e) { e.printStackTrace(); } } interface Handler { void handleAccept(SelectionKey selectionKey) throws IOException; void handleRead(SelectionKey selectionKey) throws IOException; void handleWrite(SelectionKey selectionKey) throws IOException; } class ServerHandler implements Handler { public void handleAccept(SelectionKey selectionKey) throws IOException { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); logger.info("Server: accept client socket " + socketChannel); socketChannel.configureBlocking(false); socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ); } public void handleRead(SelectionKey selectionKey) throws IOException { ByteBuffer byteBuffer = ByteBuffer.allocate(512); SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); // 改了原文中的一个小问题,重复循环忙等的问题 while (true) { int readBytes = socketChannel.read(byteBuffer); if (readBytes > 0) { logger.info("Server: readBytes: " + readBytes + ", data: " + new String(byteBuffer.array(), 0, readBytes)); // 这个flip是一定需要的, 会把limit和position重置,这样才能重新读到数据 byteBuffer.flip(); socketChannel.write(byteBuffer); } else { break; } } socketChannel.close(); } // handle Write这一块,其实需要再多研究一下 public void handleWrite(SelectionKey selectionKey) throws IOException { ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment(); byteBuffer.flip(); SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); socketChannel.write(byteBuffer); if (byteBuffer.hasRemaining()) { selectionKey.interestOps(SelectionKey.OP_READ); } } } public static void main(String[] args) { String hostname = "localhost"; int port = 8106; NioServer nioServer = new NioServer(hostname, port); nioServer.start(); }}
然后启动之后,用telnet作为客户端来访问一下:
$ telnet 127.0.0.1 8106Trying 127.0.0.1...Connected to localhost.Escape character is '^]'.hihihihihihiConnection closed by foreign host.如果没有byteBuffer.flip() 这个函数,那么不会有字符串返回。
打包: Project Structure-> Artifacts-> + -> Create Module with dependacies -> extract to the target JAR -> MANIFEST.MF路径最后的src改成resources.
然后 build->build artifact,就能在out目录里面有一个jar包,运行 java -jar xxx.jar
然后开始写客户端的代码:
package com.myapp.nio;import org.apache.log4j.LogManager;import org.apache.log4j.Logger;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SocketChannel;/** * Created by baidu on 16/11/17. */public class NioClient { private static final Logger logger = LogManager.getLogger(NioClient.class); private InetSocketAddress inetSocketAddress; public NioClient(String hostname, int port) { inetSocketAddress = new InetSocketAddress(hostname, port); } public void send(String requestData) { try { SocketChannel socketChannel = SocketChannel.open(inetSocketAddress); socketChannel.configureBlocking(false); ByteBuffer byteBuffer = ByteBuffer.allocate(512); socketChannel.write(ByteBuffer.wrap(requestData.getBytes())); while(true) { byteBuffer.clear(); int readBytes = socketChannel.read(byteBuffer); if (readBytes > 0) { byteBuffer.flip(); String getStr = new String(byteBuffer.array(), 0, readBytes); logger.info("Client: bytes: " + readBytes + "data: " + getStr); System.out.printf("Get return str: %s", getStr); socketChannel.close(); break; } } } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { String hostname = "localhost"; int port = 8106; String requestData = "HIHIHI here~~~"; new NioClient(hostname, port).send(requestData); }}
启动运行之后,命令行会返回:
log4j:WARN No appenders could be found for logger (com.myapp.nio.NioClient).log4j:WARN Please initialize the log4j system properly.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.Get return str: HIHIHI here~~~Process finished with exit code 0
开始没有加log4j的properties,所以没有写日志,要加上。
log4j.properties
#log4j.rootLogger=INFO,Console,Filelog4j.rootLogger=INFO,File#控制台日志log4j.appender.Console=org.apache.log4j.ConsoleAppenderlog4j.appender.Console.Target=System.outlog4j.appender.Console.layout=org.apache.log4j.PatternLayoutlog4j.appender.Console.layout.ConversionPattern=[%p][%t][%d{yyyy-MM-dd HH\:mm\:ss}][%C] - %m%n#普通文件日志log4j.appender.File=org.apache.log4j.RollingFileAppenderlog4j.appender.File.File=logs/nio_client.loglog4j.appender.File.MaxFileSize=10MB#输出日志,如果换成DEBUG表示输出DEBUG以上级别日志log4j.appender.File.Threshold=ALLlog4j.appender.File.layout=org.apache.log4j.PatternLayoutlog4j.appender.File.layout.ConversionPattern=[%p][%t][%d{yyyy-MM-dd HH\:mm\:ss}][%C] - %m%nlog4j.appender.File.encoding=UTF-8
pom.xml也加上依赖:
4.0.0 com.myapp.nio nioClient 1.0-SNAPSHOT 1.2.17 log4j log4j ${log4j.version}
下次再写个线程池来处理看看。
(完)