声明

该系列文章由书籍《Netty权威指南》第二版整理而来。只为记录学习笔记。
若认为内容侵权请及时通知本人删除相关内容。

[TOC]

时间服务器–NIO

服务端代码

服务端主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TimeServer {
public static void main(String[] args) {
int port = 1234;
try {
TimeServerDispatcher timeServer;
timeServer = new TimeServerDispatcher(port);
new Thread(timeServer).start();
} catch (IOException e) {
e.printStackTrace();
}
}
}

服务端处理类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
public class TimeServerDispatcher implements Runnable {
private Selector selector;
private ServerSocketChannel ssc;
private volatile boolean stop;
private DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public TimeServerDispatcher(int port) throws IOException {
selector = Selector.open();
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress(port), 10000);
ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server is listening on port : " + port);
}
public void stop() {
this.stop = true;
}
@Override
public void run() {
while (!this.stop) {
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
for (SelectionKey k : selectedKeys) {
selectedKeys.remove(k);
try {
doProcessRequest(k);
} catch (Exception e) {
if (k != null) {
k.cancel();
if (k.channel() != null)
k.channel().close();
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void doProcessRequest(SelectionKey key) throws IOException {
if (!key.isValid())
return;
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int len = sc.read(readBuffer);
if (len > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String reqMsg = new String(bytes, "UTF-8");
System.out.println("request msg is : " + reqMsg);
String respMsg = this.df.format(new Date());
doResponse(sc, respMsg);
} else if (len < 0) {
key.cancel();
sc.close();
} else {
// 0
}
}
}
private void doResponse(SocketChannel sc, String respMsg) throws IOException {
if (respMsg == null)
return;
byte[] bytes = respMsg.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
sc.write(writeBuffer);
}
}

客户端代码

客户端主程序

1
2
3
4
5
6
7
8
public class TimeClient {
public static void main(String[] args) {
int port = 1234;
new Thread(new TimeClientHandler("127.0.0.1", port)).start();
}
}

客户端处理程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
public class TimeClientHandler implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public TimeClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doSendReqMsg(socketChannel, "Hi Server !");
} else
socketChannel.register(selector, SelectionKey.OP_CONNECT);
} catch (IOException e) {
e.printStackTrace();
}
while (!this.stop) {
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
for (SelectionKey k : selectedKeys) {
selectedKeys.remove(k);
try {
doProcessResponse(k);
} catch (Exception e) {
if (k != null) {
k.cancel();
if (k.channel() != null)
k.channel().close();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void doProcessResponse(SelectionKey key) throws IOException {
if (!key.isValid())
return;
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (sc.finishConnect()) {
sc.register(selector, SelectionKey.OP_READ);
doSendReqMsg(sc, "Hi Server !");
} else {
throw new RuntimeException("连接失败");
}
}
if (key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String respMsg = new String(bytes, "UTF-8");
System.out.println("time : " + respMsg);
this.stop = true;
} else if (readBytes < 0) {
key.cancel();
sc.close();
} else {
// 0
}
}
}
private void doSendReqMsg(SocketChannel sc, String reqMsg) throws IOException {
byte[] req = reqMsg.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
}
}

总结

这种模型有如下特点:

  • 客户端的连接操作时异步的
  • SocketChannel的读写操作是异步的
  • 一个Selector可以处理成千上万个客户端连接

但是:

  • 代码复杂
  • 这里的实现可能出现 “读半包”,”写半包”的情况

参考资料: 《Netty权威指南》第二版