ZooKeeper系列之(十一):客户端实现机制
< 返回列表时间: 2019-09-09来源:OSCHINA
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
笔者为什么要讲ZooKeeper的源码,对于程序员来说,光知道用是成为不了优秀的行家的,还要知道所以然,只有知道了内部实现机制,才能开拓眼界提高自我。而笔者认为ZooKeeper是最好的入门分布式系统的敲门砖。
好不多说,我们这里先看看客户端是怎么运转的。
1、概述
ZooKeeper客户端是连接到服务端集群,获取zk节点数据,监听zk节点数据变化的。zk节点就是znode,它是类似文件路径的东西,每个znode可以设置它的文本内容,当znode的文本内容被其他客户端修改后,所有监听该znode的客户端都会实时被通知到,这样的方式实现了分布式一致性存储。
在客户端里有一个ZooKeeper类,注意这里特指类名称。客户端通过Zookeeper类来发送命令给Zookeeper服务器。
ZooKeeper类中还可以设置Watcher,这就是znode监听者。Watcher可以指定监听哪个znode,当Zookeeper集群的znode节点状态发生变化时,服务端会发送通知消息给客户端的Watcher。
Watcher又可以细分为3种Watcher子类:DataWatcher,ExistWatcher和ChildWatcher。根据字面意思就能猜得出来,DataWatcher是znode的数据变化时触发,ExistWatcher是znode的创建删除时触发,ChildWatcher是在znode下创建子目录(也是znode)时触发。实际生产环境中用的最多的还是DataWatcher。
下面我们先分析分析ZooKeeper类的实现,至于Watcher的实现后面会有专门介绍。
2、通信机制
客户端与服务端交互的数据流大致如下:

首先是客户端ZooKeeper类发起命令请求,然后通过ClientCntx发送给服务端集群。ClientCnxn是上层类,屏蔽了具体的网络通信流程,网络通过是ClientCntxSocketNetty实现的,服务端是ZooKeeperServer。 以create命令(创建znode)为例,ZooKeeper类会构造Packet,将请求数据封装在Packet里。然后调用ClientCnxn的submitRequest方法。ClientCnxn的submitRequest方法调用queuePacket方法将Packet放入outgoingQueue队列中,然后线程执行wait方法挂起等待服务端返回。 ClientCnxnSocketNetty和ClientCnxn共享同一个outgoingQueue,ClientCnxnSocketNetty启动了发送守护进程,当outgoingQueue队列中有Packet时,会自动将该Packet发送给ZooKeeperServer。同时ClientCnxnSocketNetty启动接收线程实时接收ZooKeeperServer的返回数据,返回数据触发ClientCnxnSocketNetty中启动的ZKClientHandler的MessageReceived事件。 在MessageReceived事件中回调ClientCnxn中的SendThread类的readResponse方法。 readResponse方法中最后调用finishPacket方法唤醒在该Packet上wait的线程,也就是发起submitRequest的方法,使得submitRequest方法返回到ZooKeeper类。 客户端请求过程结束。
3、ZooKeeper类
客户端在构造函数阶段创建ClientCnxn 与服务端连接,后续命令都通过ClientCntx发送给服务端。ClientCnxn是客户端与服务端通信的底层接口,它和ClientCnxnSocketNetty一起工作提供网络通信服务。
服务端是ZooKeeperServer类,收到ClientCnxn的Request请求包后调用相应的处理逻辑,返回结果再通过ClientCnxc发送给客户端。
ClientCntx连接时可以同时指定多台服务器地址,根据一定的算法挑选某一个服务器地址进行连接,当某个服务器发生故障无法连接时,会自动连接其他的服务器。实现这一机制的是HostProdiver接口,实现用StaticHostProvider类。
ZooKeeper类的构造函数如下: public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException { if (clientConfig == null) { clientConfig = new ZKClientConfig(); } this.clientConfig = clientConfig; watchManager = defaultWatchManager(); watchManager.defaultWatcher = watcher; ConnectStringParser connectStringParser = new ConnectStringParser(connectString); hostProvider = aHostProvider; cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); }
这里的connectString是连接字符串,aHostProvider是管理服务端列表的。watcher是监听器。
为什么有aHostProvider?客户端可以配置多个服务端地址,这样当某个服务端挂掉的时候,客户端会自动尝试连接其他的服务端,实现分布式可靠性。
创建了ZooKeeper对象后就可以调用具体的读写数据的方法了,下面列举常见方法的实现机制。
create方法根据输入参数构造出CreateRequest包,然后通过submitRequest方法传递给服务端,submitRequest方法将CreateRequest转换成Packet包并调用sendPacket方法将发送包放入队列,等待发送线程发送给服务端。
服务端响应完成后会将返回结果填充到CreateResponse实体中返回给客户端。
4、发送命令
我们选取getData方法,来看看客户端的内部机制,其他命令的处理过程是类似的,不同的只是命令类型不同而已。
getData方法从服务端读取znode的数据,入参同时包括watcher,这样在znode数据被其他客户端修改后,会实时回调watcher来使得所有客户端同步本次变化。
先给出getData的代码: public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx) { final String clientPath = path; PathUtils.validatePath(clientPath); WatchRegistration wcb = null; if (watcher != null) { wcb = new DataWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, clientPath, serverPath, ctx, wcb); }
这里干了几件事情呢?主要干了两件事。
(1)注册watcher,这个很好理解,至于watcher的细节会在其他文章里专栏叙说。
(2)构建完整的znode的路径名,从根目录开始。然后将znode的路径名和GetDataRequest类型打包放到ClientCnxn的发送队列里,等待排队发往服务端。
其他命令的处理过程是类似的,不同的只是命令类型不同而已,对应到代码里是不同的Request对象。getData命令对应GetDataRequest类;Exists方法对应ExistsRequest类。他们的父类却是同一个。ZooKeeper支持的Request类主要有以下这些: create:CreateRequest delete:DeleteRequest exists:ExistsRequest getData:GetDataRequest setData:SetDataRequest getChildren:GetChildrenRequest
对于create命令来说,和GetData有一点不同。不同点在于以下两点:
(1)create命令是立即返回结果的,而getData等其他命令是异步返回结果的。getData入参里的DataCallback参数就是异步回调处理方法。
(2)create是调用ClientCnxn的submitRequest方法启动发送命令过程,而getData等其他方法是调用ClientCnxn的queuePacket方法将请求命令缓存在队列里,等待发送线程异步发送。

5、ClientCnxn
前面我们看到ZooKeeper类的命令发送都是通过ClientCnxn类实现的。这里就谈谈ClientCnxn类干了哪些活。
Clientcnxn将客户端请求加入发送队列,等待sendThread发送。eventThread负责处理Server返回的WatchedEvent,以及回调注册的客户端事件接口处理函数。
5.1 queuePacket
这是ClientCnxn里最重要的一个方法,它将请求包放入发送队列outgoingQueue中,等待发送线程发送给服务端。 public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration atchRegistration, WatchDeregistration watchDeregistration) { Packet packet = null; packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; packet.watchDeregistration = watchDeregistration; synchronized (state) { if (!state.isAlive() || closing) { conLossPacket(packet); } else { if (h.getType() == OpCode.closeSession) { closing = true; } outgoingQueue.add(packet); } } sendThread.getClientCnxnSocket().packetAdded(); return packet; }
最后告诉SendThread数据已经放好了,至于何时发送就等SendThread自己来决定了。
5.2 submitRequest
提交客户端请求到服务端,这是立即返回的方法,如果请求包没处理完则一直等待下去。submitRequest方法主要用在create命令。 ReplyHeader r = new ReplyHeader(); Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration); synchronized (packet) { while (!packet.finished) { packet.wait(); } } return r;
5.3 sendPacket
sendPacket构建Packet,然后调用发送线程SendThread里的同名sendPacket方法来发送数据到服务端。 public void sendPacket(Record request, Record response, AsyncCallback cb, int opCode) throws IOException { int xid = getXid(); RequestHeader h = new RequestHeader(); h.setXid(xid); h.setType(opCode); ReplyHeader r = new ReplyHeader(); r.setXid(xid); Packet p = new Packet(h, r, request, response, null, false); p.cb = cb; sendThread.sendPacket(p); }
5.4 finishPacket
该方法在接收到服务端的响应时,唤醒等待响应的客户端线程,通过调用Packet的notifyAll方法来唤醒wait在该Packet上的线程。
如果客户端请求指定了Watcher,则同时生成WatchedEvent事件并放入事件队列,等待eventThread线程处理。
代码片段: private void finishPacket(Packet p) { if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; eventThread.queuePacket(p); } }
5.5 readResponse
ClientCnxnSocketNetty收到服务端响应后触发ZKClientHandler的messageReceived事件,在该事件处理逻辑中调用sendThread的readResponse方法获取服务端响应。
如果服务端响应的是WatchedEvent事件,则将事件放入eventThread中等候调度执行事件方法。
如果服务端响应的是客户端命令结果,则将Packet从发送队列删除,最后调用CientCnxn的finishPacket方法完成最后的处理,finishPacket方法在前面已经讲过了。
readResponse的主要代码如下: void readResponse(ByteBuffer incomingBuffer) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream( incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); if (replyHdr.getXid() == -1) { // -1 means notification WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); } WatchedEvent we = new WatchedEvent(event); eventThread.queueEvent( we ); return; } Packet packet; try { packet.replyHeader.setXid(replyHdr.getXid()); packet.replyHeader.setErr(replyHdr.getErr()); packet.replyHeader.setZxid(replyHdr.getZxid()); if (replyHdr.getZxid() > 0) { lastZxid = replyHdr.getZxid(); } } finally { finishPacket(packet); } }
热门排行