Java AIO
0
感觉上NIO开发要比AIO简单一点。
两者区别:http://bbym010.iteye.com/blog/2100868
代码参考:http://yunhaifeiwu.iteye.com/blog/1714664
上面文章的代码已经非常好了,注释也写的非常好,这里需要补充一些需要注意的地方:
解码
attachment.flip();
CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
decoder.onMalformedInput(CodingErrorAction.IGNORE); // 注意
content = decoder.decode(attachment).toString();
attachment.compact();
连接
socket.connect(new InetSocketAddress("localhost", 8888), socket, new ConnectHandler());
这里注意加上IP,不然连接打不开。
完整代码
Server
package com.demo.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Server {
public void server() throws IOException {
ExecutorService executor = Executors.newFixedThreadPool(20);
AsynchronousChannelGroup asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(executor);
AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open(asyncChannelGroup).bind(new InetSocketAddress("localhost", 8888));
channel.accept(channel, new AcceptHandler());
}
private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {
@Override
public void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel attachment) {
attachment.accept(attachment, this);
ByteBuffer buffer = ByteBuffer.allocate(1024);
result.read(buffer, buffer, new ReaderHandler(result));
}
@Override
public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {
}
}
private class ReaderHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel socket;
public ReaderHandler(AsynchronousSocketChannel socket) {
this.socket = socket;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
if(result > 0) {
String content = null;
try {
attachment.flip();
CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
// decoder.onMalformedInput(CodingErrorAction.IGNORE);
content = decoder.decode(attachment).toString();
attachment.compact();
System.out.println("收到客户端消息:" + content);
} catch (CharacterCodingException e) {
e.printStackTrace();
}
socket.read(attachment, attachment, this);
ByteBuffer client = ByteBuffer.wrap(("服务器回复消息:" + content).getBytes());
socket.write(client, client, new WriterHandler(socket));
} else if(result == 0) {
System.out.println("空消息");
} else {
attachment = null;
System.out.println("断开");
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
}
private class WriterHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel socket;
public WriterHandler(AsynchronousSocketChannel socket) {
this.socket = socket;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
if(result > 0) {
socket.write(attachment, attachment, this);
} else if(result == 0) {
System.out.println("空消息");
} else {
attachment = null;
System.out.println("断开");
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
}
public static void main(String[] args) throws IOException, InterruptedException {
new Server().server();
Thread.sleep(Integer.MAX_VALUE);
}
}
Client
package com.demo.aio;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Client {
private AsynchronousSocketChannel socket = null;
public void client() throws IOException {
ExecutorService executor = Executors.newFixedThreadPool(20);
AsynchronousChannelGroup asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(executor);
socket = AsynchronousSocketChannel.open(asyncChannelGroup);
socket.setOption(StandardSocketOptions.TCP_NODELAY, true);
socket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
socket.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
socket.connect(new InetSocketAddress("localhost", 8888), socket, new ConnectHandler()); // 注意localhost
}
private class ConnectHandler implements CompletionHandler<Void, AsynchronousSocketChannel> {
@Override
public void completed(Void result, AsynchronousSocketChannel attachment) {
socket.write(ByteBuffer.wrap("客户端开始连接".getBytes()));
ByteBuffer clientBuffer = ByteBuffer.allocate(1024);
attachment.read(clientBuffer, clientBuffer, new ReaderHandler(attachment));
}
@Override
public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
}
}
private class ReaderHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel socket;
public ReaderHandler(AsynchronousSocketChannel socket) {
this.socket = socket;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
if(result > 0) {
String content = null;
try {
attachment.flip();
content = Charset.forName("UTF-8").newDecoder().decode(attachment).toString();
attachment.compact();
} catch (CharacterCodingException e) {
e.printStackTrace();
}
System.out.println(content);
socket.read(attachment, attachment, this);
} else if(result == 0) {
System.out.println("空消息");
} else {
attachment = null;
System.out.println("断开");
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
}
public void send() throws UnsupportedEncodingException {
Scanner scanner = new Scanner(System.in);
String tmp = null;
while((tmp = scanner.next()) != null) {
ByteBuffer buffer = ByteBuffer.wrap(tmp.getBytes("utf-8"));
System.out.println("客户端发出信息:" + tmp);
socket.write(buffer, buffer, new SenderHandler(socket));
}
}
private class SenderHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel socket;
public SenderHandler(AsynchronousSocketChannel socket) {
this.socket = socket;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
if(result > 0)
socket.write(attachment, attachment, this);
else
attachment = null;
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
}
public static void main(String[] args) throws IOException {
Client client = new Client();
client.client();
client.send();
}
}