本文主要记录自己学习zookeeper时的一些个人笔记。不喜勿喷。

1 环境准备

随便建个java项目即可。

maven坐标:

1
2
3
4
5
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>

2 同步方式调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
public class ZKTestSync {
public static String connectStr = "192.168.161.128:2181";
public ZooKeeper keeper = null;
private static Stat stat = new Stat();
private static Logger log = LoggerFactory.getLogger(ZKTestSync.class);
@Before
public void init() {
try {
keeper = new ZooKeeper(connectStr, 3000, (WatchedEvent event) -> {
log.info("事件:{}", event);
if (event.getState() == KeeperState.SyncConnected) {
if (event.getType() == EventType.None && event.getPath() == null) {
} else {
if (event.getType() == EventType.NodeChildrenChanged) {
log.info("节点{}发生变化", event.getPath());
} else if (EventType.NodeDataChanged == event.getType()) {
try {
log.info("节点{}数据发生变化", event.getPath());
this.keeper.getData(event.getPath(), false, stat);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
@After
public void close() {
if (keeper != null)
try {
keeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void testCreateNode() {
try {
String string = this.keeper.create("/node_3", "3".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(string);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void testGetChildren() {
try {
List<String> list = this.keeper.getChildren("/", false);
list.stream().forEach(System.out::println);
// true表示对节点的变化感兴趣,
// 在节点变化时在ZooKeeper构造函数传入的Watcher中可收到通知.
// 但是这种监听器只是一次性的
list = this.keeper.getChildren("/", true);
list.stream().forEach(System.out::println);
Thread.sleep(3 * 60 * 1000);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testGetData() {
try {
String str = new String(this.keeper.getData("/node_8", true, stat));
System.out.println(str);
Thread.sleep(3 * 60 * 1000);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testDelete() {
try {
this.keeper.delete("/node_40000000005", -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Test
public void testSetData() {
try {
Stat s = this.keeper.setData("/node_1", "ddd".getBytes(), -1);
System.out.println(s);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testSetACL() {
try {
ACL ipAcl = new ACL(Perms.CREATE | Perms.DELETE | Perms.READ, new Id("ip", "192.168.161.1"));
ACL digestAcl = new ACL(Perms.READ | Perms.WRITE, new Id("digest", DigestAuthenticationProvider.generateDigest("hylexus:123456")));
List<ACL> acls = Arrays.asList(ipAcl, digestAcl);
String string = this.keeper.create("/node_11", "8".getBytes(), acls, CreateMode.PERSISTENT);
System.out.println(string);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testGenerateDigest() throws NoSuchAlgorithmException {
System.out.println(DigestAuthenticationProvider.generateDigest("hylexus:123"));
}
}

3 异步方式调用

和上面的同步代码的最大区别就是,异步代码没法及时获取返回值,或者说他没有返回值。
只能通过提供回调函数的方式来处理操作完成后的工作。
但是有了回调函数,难免代码中有大量匿名类,为简单,此处使用java8的lambda代替匿名类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
public class ZKTestASync {
public static String connectStr = "192.168.161.128:2181";
public ZooKeeper keeper = null;
private static final Logger log = LoggerFactory.getLogger(ZKTestASync.class);
private static Stat stat = new Stat();
@Before
public void init() {
try {
keeper = new ZooKeeper(connectStr, 3000, (WatchedEvent event) -> {
log.info("事件:{}", event);
if (event.getState() == KeeperState.SyncConnected) {
if (event.getType() == EventType.None && event.getPath() == null) {
} else {
if (event.getType() == EventType.NodeChildrenChanged) {
log.info("节点{}发生变化", event.getPath());
} else if (EventType.NodeDataChanged == event.getType()) {
try {
log.info("节点{}数据发生变化", event.getPath());
this.keeper.getData(event.getPath(), false, stat);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
@After
public void close() {
if (keeper != null)
try {
keeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 异步创建
@Test
public void testCreateNodeASync() {
// 异步调用,没有返回值,通过回调函数处理结果
this.keeper.create("/node_4", "3".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, new StringCallback() {
/***
* @param rc:返回码
* @param path:要创建的节点的完整路径(想要创建的路径)
* @param ctx:create方法传入的上下文参数,此处是
* "testCreateNodeASync"
* @param name:返回的创建的真实路径(创建顺序节点时返回的真实路径和传入的路径是不同的)
*/
@Override
public void processResult(int rc, String path, Object ctx, String name) {
log.info("rc:{}", rc);
log.info("path:{}", path);
log.info("ctx:{}", ctx);
log.info("name:{}", name);
}
}, "testCreateNodeASync");
}
@Test
public void testGetChildren() {
try {
/***
* @param rc:返回码
* @param path:要创建的节点的完整路径(想要创建的路径)
* @param ctx:create方法传入的上下文参数
* @param children:子节点列表
* @param stat:节点状态
*/
this.keeper.getChildren("/", true, //
(int rc, String path, Object ctx, List<String> children, Stat stat) -> {
log.info("rc:{}", rc);
log.info("path:{}", path);
log.info("ctx:{}", ctx);
log.info("children:{}", children);
log.info("stat:{}", stat);
}, "这里可以传入任何Object作为上下文以便在回调函数函数中使用");
Thread.sleep(3 * 60 * 100);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testGetData() {
try {
this.keeper.getData("/node_1", true, (int rc, String path, Object ctx, byte[] data, Stat stat) -> {
log.info("rc:{}", rc);
log.info("path:{}", path);
log.info("ctx:{}", ctx);
log.info("data:{}", new String(data));
log.info("stat:{}", stat);
}, null);
Thread.sleep(3 * 60 * 1000);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testDelete() {
try {
this.keeper.delete("/node_6", -1, (int rc, String path, Object ctx) -> {
log.info("rc:{}", rc);
log.info("path:{}", path);
log.info("ctx:{}", ctx);
}, null);
Thread.sleep(3 * 60 * 1000);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testSetData() {
try {
this.keeper.setData("/node_1", "aaa".getBytes(), -1, (int rc, String path, Object ctx, Stat stat) -> {
log.info("rc:{}", rc);
log.info("path:{}", path);
log.info("ctx:{}", ctx);
log.info("stat:{}", stat);
}, null);
Thread.sleep(3 * 60 * 1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}

4 总结

  • 整个API还是简单易用的
  • 注册的监听器只是一次性的,没有提供类似于自动注册多次的API
  • session的超时重连可能导致watcher的重复执行,需要手动自己控制
  • 返回值中还有的是byte[],入参也有byte[]。操作不是很舒服

当然,也有zookeeper编程的其他框架可用,比如ZkClient等