跳至主要內容

事务请求以及Watcher源码分析

soulballad分布式ZookeeperZookeeper约 5260 字大约 18 分钟

Watcher 的基本流程

ZooKeeper 的 Watcher 机制,总的来说可以分为三个过程:客户端注册 Watcher、服务器处理 Watcher 和客户端回调 Watcher

客户端注册 watcher 有 3 种方式:getData、exists、getChildren;以如下代码为例来分析整个触发机制的原理

  1. getData:获取一个节点数据,并监听;
  2. exists:判断节点是否存在,并监听;
  3. getChildren:获取一个节点下所有子节点,并监听。

基于 zkclient 客户端发起一个数据操作

<dependency> 
    <groupId>com.101tec</groupId> 
    <artifactId>zkclient</artifactId> 
    <version>0.10</version> 
</dependency> 
public class Test implements Watcher{

    static ZooKeeper zooKeeper;

    static {
        try {
            zooKeeper = new ZooKeeper("localhost:2181", 4000,new Test());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("eventType:"+event.getType());
        if(event.getType()==Event.EventType.NodeDataChanged){
            try {
                // zookeeper 默认只监听一次
                // 接收到时间后再次监听,实现循环监听
                zooKeeper.exists(event.getPath(),true);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    //getData()/  exists  /getChildren
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //Curator
        String path="/watcher";
        if(zooKeeper.exists(path,false)==null) {
            zooKeeper.create("/watcher", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        Thread.sleep(1000);
        System.out.println("-----------");
        Stat stat=zooKeeper.exists(path,true); //true表示使用zookeeper实例中配置的watcher

        System.in.read();
    }
}

ZooKeeper API 的初始化过程

ZooKeeper zookeeper=new ZooKeeper(192.168.11.152:2181,4000,new Watcher(){ 
    public void processor(WatchedEvent event){ 
        System.out.println(“event.type”); 
    } 
})

在创建一个 ZooKeeper 客户端对象实例时,我们通过 new Watcher() 向构造方法中传入一个默认的 Watcher, 这个 Watcher 将作为整个 ZooKeeper 会话期间的默认 Watcher,会一直被保存在客户端 ZKWatchManager 的 defaultWatcher 中;代码如下

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,  boolean canBeReadOnly, HostProvider aHostProvider,  ZKClientConfig clientConfig) throws IOException { 
    LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher); 

    if (clientConfig == null) { 
        clientConfig = new ZKClientConfig(); 
    } 
    this.clientConfig = clientConfig; 
    watchManager = defaultWatchManager(); 

    watchManager.defaultWatcher = watcher; // 在这里将 watcher 设置到ZKWatchManager 
    ConnectStringParser connectStringParser = new ConnectStringParser( 
        connectString); 
    hostProvider = aHostProvider; 
    // 初始化了 ClientCnxn,并且调用 cnxn.start()方法 
    cnxn = new ClientCnxn(connectStringParser.getChrootPath(), 
                          hostProvider, sessionTimeout, this, watchManager, 
                          getClientCnxnSocket(), canBeReadOnly); 
    cnxn.start(); 
} 

ClientCnxn:是 Zookeeper 客户端和 Zookeeper 服务器端进行通信和事件通知处理的主要类,它内部包含两个类,

  1. SendThread :负责客户端和服务器端的数据通信, 也包括事件信息的传输
  2. EventThread : 主要在客户端回调注册的 Watchers 进行通知处理

ClientCnxn 初始化

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) { 
    this.zooKeeper = zooKeeper; 
    this.watcher = watcher; 
    this.sessionId = sessionId; 
    this.sessionPasswd = sessionPasswd; 
    this.sessionTimeout = sessionTimeout; 
    this.hostProvider = hostProvider; 
    this.chrootPath = chrootPath; 

    connectTimeout = sessionTimeout / hostProvider.size(); 
    readTimeout = sessionTimeout * 2 / 3; 
    readOnly = canBeReadOnly; 
    sendThread = new SendThread(clientCnxnSocket); // 初始化 sendThread 
    eventThread = new EventThread(); // 初始化 eventThread 
    this.clientConfig=zooKeeper.getClientConfig(); 
} 

public void start() { //启动两个线程 
    sendThread.start(); 
    eventThread.start(); 
} 

服务端接收请求处理流程

服务端有一个 NIOServerCnxn 类,用来处理客户端发送过来的请求

NIOServerCnxn

ZookeeperServer-->zks.processPacket(this, bb);
处理客户端传送过来的数据包

public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException{
    // We have the request, now process and setup for next
    InputStream bais = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
    /**
     * RequestHeader h = new RequestHeader();
     * h.setType(ZooDefs.OpCode.exists);
     */
    RequestHeader h = new RequestHeader();
    // 反序列化 header
    h.deserialize(bia, "header");
    // Through the magic of byte buffers, txn will not be
    // pointing
    // to the start of the txn
    incomingBuffer = incomingBuffer.slice();
    if (h.getType() == OpCode.auth) {
        LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
        AuthPacket authPacket = new AuthPacket();
        ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
        String scheme = authPacket.getScheme();
        AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
        Code authReturn = KeeperException.Code.AUTHFAILED;
        if(ap != null) {
            try {
                authReturn = ap.handleAuthentication(cnxn, authPacket.getAuth());
            } catch(RuntimeException e) {
                LOG.warn("Caught runtime exception from AuthenticationProvider: " + scheme + " due to " + e);
                authReturn = KeeperException.Code.AUTHFAILED;                   
            }
        }
        if (authReturn!= KeeperException.Code.OK) {
            if (ap == null) {
                LOG.warn("No authentication provider for scheme: " + scheme + " has " + ProviderRegistry.listProviders());
            } else {
                LOG.warn("Authentication failed for scheme: " + scheme);
            }
            // send a response...
            ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());
            cnxn.sendResponse(rh, null, null);
            // ... and close connection
            cnxn.sendBuffer(ServerCnxnFactory.closeConn);
            cnxn.disableRecv();
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Authentication succeeded for scheme: " + scheme);
            }
            LOG.info("auth success " + cnxn.getRemoteSocketAddress());
            ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
            cnxn.sendResponse(rh, null, null);
        }
        return;
    } else {
        if (h.getType() == OpCode.sasl) {
            Record rsp = processSasl(incomingBuffer,cnxn);
            ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
            cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
            return;
        }
        else {
            // exists 会走这里
            Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
            si.setOwner(ServerCnxn.me);
            submitRequest(si);
        }
    }
    cnxn.incrOutstandingRequests(h);
}

submitRequest

负责在服务端提交当前请求

public void submitRequest(Request si) {
    if (firstProcessor == null) {
        synchronized (this) {
            try {
                // Since all requests are passed to the request
                // processor it should wait for setting up the request
                // processor chain. The state will be updated to RUNNING
                // after the setup.
                while (state == State.INITIAL) {
                    wait(1000);
                }
            } catch (InterruptedException e) {
                LOG.warn("Unexpected interruption", e);
            }
            if (firstProcessor == null || state != State.RUNNING) {
                throw new RuntimeException("Not started");
            }
        }
    }
    try {
        touch(si.cnxn);
        boolean validpacket = Request.isValid(si.type);
        if (validpacket) {
            firstProcessor.processRequest(si);
            if (si.cnxn != null) {
                incInProcess();
            }
        } else {
            LOG.warn("Received packet at server of unknown type " + si.type);
            new UnimplementedRequestProcessor().processRequest(si);
        }
    } catch (MissingSessionException e) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Dropping request: " + e.getMessage());
        }
    } catch (RequestProcessorException e) {
        LOG.error("Unable to process request:" + e.getMessage(), e);
    }
}

firstProcessor 的请求链组成

1.firstProcessor 的初始化是在 ZookeeperServer 的 setupRequestProcessor 中完成的,代码如下

protected void setupRequestProcessors() { 
    RequestProcessor finalProcessor = new FinalRequestProcessor(this); 
    RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); 
    ((SyncRequestProcessor)syncProcessor).start(); 
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    //需要注意的是,PrepRequestProcessor 中传递的是一个 syncProcessor 
    ((PrepRequestProcessor)firstProcessor).start(); 
}

从上面我们可以看到 firstProcessor 的实例是一个 PrepRequestProcessor,而这个构造方法中又传递了一个 Processor 构成了一个调用链。
RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
而 syncProcessor 的构造方法传递的又是一个 Processor,对应的是FinalRequestProcessor

所以整个调用链是 PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor

PredRequestProcessor.processRequest(si);

通过上面了解到调用链关系以后,我们继续再看 firstProcessor.processRequest(si);
会调用到 PrepRequestProcessor

public void processRequest(Request request) { 
   submittedRequests.add(request); 
} 

唉,很奇怪,processRequest 只是把 request 添加到 submittedRequests 中,根据前面的经验,很自然的想到这里又是一个异步操作。而 subittedRequests 又是一个阻塞队列

LinkedBlockingQueue submittedRequests = new LinkedBlockingQueue();
而 PrepRequestProcessor 这个类又继承了线程类,因此我们直接找到当前类中的run 方法如下

public void run() { 
    try { 
        while (true) {  
            Request request = submittedRequests.take(); //ok,从队列中拿到请求进行处理 
            long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK; 
            if (request.type == OpCode.ping) { 
                traceMask = ZooTrace.CLIENT_PING_TRACE_MASK; 
            } 
            if (LOG.isTraceEnabled()) { 
                ZooTrace.logRequest(LOG, traceMask, 'P', request, ""); 
            } 
            if (Request.requestOfDeath == request) { 
                break; 
            } 
            pRequest(request); //调用 pRequest 进行预处理 
        } 
    } catch (RequestProcessorException e) { 
        if (e.getCause() instanceof XidRolloverException) { 
            LOG.info(e.getCause().getMessage()); 
        } 
        handleException(this.getName(), e); 
    } catch (Exception e) { 
        handleException(this.getName(), e); 
    } 
    LOG.info("PrepRequestProcessor exited loop!"); 
} 

pRequest

预处理这块的代码太长,就不好贴了。前面的 N 行代码都是根据当前的 OP 类型进行判断和做相应的处理,在这个方法中的最后一行中,我们会看到如下代码

nextProcessor.processRequest(request);

SyncRequestProcessor. processRequest

public void processRequest(Request request) { 
    // request.addRQRec(">sync"); 
    queuedRequests.add(request); 
} 

这个方法的代码也是一样,基于异步化的操作,把请求添加到 queuedRequets 中,那么我们继续在当前类找到 run 方法

public void run() { 
    try { 
        int logCount  
            // in the ensemble take a snapshot at the same time 
            int randRoll = r.nextInt(snapCount/2); 
        while (true) { 
            Request si = null; 
            //从阻塞队列中获取请求 
            if (toFlush.isEmpty()) { 
                si = queuedRequests.take();   
            } else { 
                si = queuedRequests.poll(); 
                if (si == null) { 
                    flush(toFlush); 
                    continue; 
                } 
            } 
            if (si == requestOfDeath) { 
                break; 
            } 
            if (si != null) { 
                // track the number of records written to the log 
                //下面这块代码,粗略看来是触发快照操作,启动一个处理快照的线程 
                if (zks.getZKDatabase().append(si)) { 
                    logCount++; 
                    if (logCount > (snapCount / 2 + randRoll)) { 
                        randRoll = r.nextInt(snapCount/2); 

                        // roll the log 
                        zks.getZKDatabase().rollLog(); 
                        // take a snapshot 
                        if (snapInProcess != null && snapInProcess.isAlive()) { 
                            LOG.warn("Too busy to snap, skipping"); 
                        } else { 
                            snapInProcess = new ZooKeeperThread("Snapshot Thread") { 
                                public void run() { 
                                    try { 
                                        zks.takeSnapshot(); 
                                    } catch(Exception e) { 
                                        LOG.warn("Unexpected exception", e); 
                                    } 
                                } 
                            }; 
                            snapInProcess.start(); 
                        } 
                        logCount = 0; 
                    } 
                } else if (toFlush.isEmpty()) { 
                    // optimization for read heavy workloads 
                    // iff this is a read, and there are no pending 
                    // flushes (writes), then just pass this to the next 
                    // processor 
                    if (nextProcessor != null) { 
                        nextProcessor.processRequest(si); //继续调用下一个处理器来处理请求 
                        if (nextProcessor instanceof Flushable) { 
                            ((Flushable)nextProcessor).flush(); 
                        } 
                    } 
                    continue; 
                } 
                toFlush.add(si); 
                if (toFlush.size() > 1000) { 
                    flush(toFlush); 
                } 
            } 
        } 
    } catch (Throwable t) { 
        handleException(this.getName(), t); 
    } finally{ 
        running = false; 
    } 
    LOG.info("SyncRequestProcessor exited!"); 
} 

FinalRequestProcessor. processRequest

这个方法就是我们在课堂上分析到的方法了,
FinalRequestProcessor.processRequest 方法并根据 Request 对象中的操作更新内存中 Session 信息或者 znode 数据。

这块代码有小 300 多行,就不全部贴出来了,我们直接定位到关键代码,根据客户端的 OP 类型找到如下的代码

case OpCode.exists: { 
    lastOp = "EXIS"; 
    // TODO we need to figure out the security requirement for this! 
    ExistsRequest existsRequest = new ExistsRequest(); 
    //反序列化 (将 ByteBuffer 反序列化成为 ExitsRequest.这个就是我们在客户端发起请求的时候传递过来的 Request 对象 
    ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest); 
    String path = existsRequest.getPath(); //得到请求的路径 
    if (path.indexOf('\0') != -1) { 
        throw new KeeperException.BadArgumentsException(); 
    } 
    //终于找到一个很关键的代码,判断请求的 getWatch 是否存在,如果存在,则传递 cnxn(servercnxn) 
    //对于 exists 请求,需要监听 data 变化事件,添加 watcher 
    Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null); 
    rsp = new ExistsResponse(stat); //在服务端内存数据库中根据路径得到结果进行组装,设置为 ExistsResponse 
    break; 
} 

总结

调用关系链如下

客户端接收服务端处理完成的响应

ClientCnxnSocketNIO.doIO

服务端处理完成以后,会通过 NIOServerCnxn.sendResponse 发送返回的响应信息, 客户端会在 ClientCnxnSocketNIO.doIO 接收服务端的返回, 注意一下 SendThread.readResponse,接收服务端的信息进行读取

void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, 
          ClientCnxn cnxn) throws InterruptedException, IOException {  
    SocketChannel sock = (SocketChannel) sockKey.channel();  
    if (sock == null) {  
        throw new IOException("Socket is null!");  
    }  
    if (sockKey.isReadable()) {  
        int rc = sock.read(incomingBuffer);  
        if (rc < 0) {  
            throw new EndOfStreamException("Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket");
        }  
        if (!incomingBuffer.hasRemaining()) {  
            incomingBuffer.flip();  
            if (incomingBuffer == lenBuffer) {  
                recvCount++;  
                readLength();  
            } else if (!initialized) {  
                readConnectResult();  
                enableRead();  
                if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {  
                    // Since SASL authentication has completed (if client is configured to do so),  
                    // outgoing packets waiting in the outgoingQueue can now be sent.  
                    enableWrite();  
                }  
                lenBuffer.clear();  
                incomingBuffer = lenBuffer;  
                updateLastHeard();  
                initialized = true;  
            } else {  
                sendThread.readResponse(incomingBuffer);  
                lenBuffer.clear();  
                incomingBuffer = lenBuffer;  
                updateLastHeard();  
            }  
        }  
    } 

SendThread. readResponse

这个方法里面主要的流程如下

首先读取 header,如果其 xid == -2,表明是一个 ping 的 response,return 如果 xid 是 -4 ,表明是一个 AuthPacket 的 response; return 如果 xid 是 -1,表明是一个 notification,此时要继续读取并构造一个 event,通过 EventThread.queueEvent 发送,return 其它情况下:
从 pendingQueue 拿出一个 Packet,校验后更新 packet 信息

void readResponse(ByteBuffer incomingBuffer) throws IOException {
    ByteBufferInputStream bbis = new ByteBufferInputStream(
        incomingBuffer);
    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    ReplyHeader replyHdr = new ReplyHeader();

    replyHdr.deserialize(bbia, "header"); //反序列化 header 
    if (replyHdr.getXid() == -2) {
        // -2 is the xid for pings
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after " + ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms");
        }
        return;
    }
    if (replyHdr.getXid() == -4) {
        // -4 is the xid for AuthPacket               
        if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
            state = States.AUTH_FAILED;                    
            eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
                                                     Watcher.Event.KeeperState.AuthFailed, null) );                                      
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got auth sessionid:0x" + Long.toHexString(sessionId));
        }
        return;
    }
    if (replyHdr.getXid() == -1) {  //表示当前的消息类型为一个 notification(意味着是服务端的一个响应事件) 
        // -1 means notification
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId));
        }
        WatcherEvent event = new WatcherEvent();
        event.deserialize(bbia, "response");

        // convert from a server path to a client path
        if (chrootPath != null) {
            String serverPath = event.getPath();
            if(serverPath.compareTo(chrootPath)==0)
                event.setPath("/");
            else if (serverPath.length() > chrootPath.length())
                event.setPath(serverPath.substring(chrootPath.length()));
            else {
                LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath);
            }
        }

        WatchedEvent we = new WatchedEvent(event);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId));
        }

        eventThread.queueEvent( we );
        return;
    }

    // If SASL authentication is currently in progress, construct and
    // send a response packet immediately, rather than queuing a
    // response as with other packets.
    if (clientTunneledAuthenticationInProgress()) {
        GetSASLRequest request = new GetSASLRequest();
        request.deserialize(bbia,"token");
        zooKeeperSaslClient.respondToServer(request.getToken(),
                                            ClientCnxn.this);
        return;
    }

    Packet packet;
    synchronized (pendingQueue) {
        if (pendingQueue.size() == 0) {
            throw new IOException("Nothing in the queue, but got "
                                  + replyHdr.getXid());
        }
        packet = pendingQueue.remove(); //因为当前这个数据包已经收到了响应,所以讲它从 pendingQueued 中移除 
    }
    /*
     * Since requests are processed in order, we better get a response
     * to the first request!
     */
    try { //校验数据包信息,校验成功后讲数据包信息进行更新(替换为服务端的信息)
        if (packet.requestHeader.getXid() != replyHdr.getXid()) {
            packet.replyHeader.setErr(
                KeeperException.Code.CONNECTIONLOSS.intValue());
            throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err " + + replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid() + " for a packet with details: " + packet );
        }

        packet.replyHeader.setXid(replyHdr.getXid());
        packet.replyHeader.setErr(replyHdr.getErr());
        packet.replyHeader.setZxid(replyHdr.getZxid());
        if (replyHdr.getZxid() > 0) {
            lastZxid = replyHdr.getZxid();
        }
        if (packet.response != null && replyHdr.getErr() == 0) {
            packet.response.deserialize(bbia, "response"); //获得服务端的响应,反序列化以后设置到 packet.response 属性中。所以我们可以在 exists 方法的最后一行通过 packet.response 拿到改请求的返回结果
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Reading reply sessionid:0x" + Long.toHexString(sessionId) + ", packet:: " + packet);
        }
    } finally {
        finishPacket(packet); //最后调用 finishPacket 方法完成处理
    }
}

finishPacket 方法

主要功能是把从 Packet 中取出对应的 Watcher 并注册到 ZKWatchManager 中去

private void finishPacket(Packet p) { 
    int err = p.replyHeader.getErr();   
    if (p.watchRegistration != null) { 
        p.watchRegistration.register(err); //将事件注册到 zkwatchemanager 中 watchRegistration,熟悉吗?在组装请求的时候,我们初始化了这个对象  把 watchRegistration 子类里面的 Watcher 实例放到 ZKWatchManager 的 exists Watches 中存储起来。 
    } 
    //将所有移除的监视事件添加到事件队列, 这样客户端能收到 “data/child 事件被移除”的事件类型 
        if (p.watchDeregistration != null) { 
            Map<EventType, Set<Watcher>> materializedWatchers = null; 
            try { 
                materializedWatchers = p.watchDeregistration.unregister(err); 
                for (Entry<EventType, Set<Watcher>> entry : materializedWatchers.entrySet()) { 
                    Set<Watcher> watchers = entry.getValue(); 
                    if (watchers.size() > 0) { 
                        queueEvent(p.watchDeregistration.getClientPath(), err,  watchers, entry.getKey()); 
                        // ignore connectionloss when removing from local 
                        // session 
                        p.replyHeader.setErr(Code.OK.intValue()); 
                    } 
                } 
            } catch (KeeperException.NoWatcherException nwe) { 
                p.replyHeader.setErr(nwe.code().intValue()); 
            } catch (KeeperException ke) { 
                p.replyHeader.setErr(ke.code().intValue()); 
            } 
        } 

    //cb 就是 AsnycCallback,如果为 null,表明是同步调用的接口,不需要异步回掉,因此,直接 notifyAll 即可。 
    if (p.cb == null) { 
        synchronized (p) { 
            p.finished = true; 
            p.notifyAll(); 
        } 
    } else { 
        p.finished = true; 
        eventThread.queuePacket(p); 
    } 
} 

watchRegistration

public void register(int rc) { 
    if (shouldAddWatch(rc)) { 
        Map<String, Set<Watcher>> watches = getWatches(rc); // 
        //通过子类的实现取得 ZKWatchManager 中的 existsWatches 
        synchronized(watches) { 
            Set<Watcher> watchers = watches.get(clientPath); 
            if (watchers == null) { 
                watchers = new HashSet<Watcher>(); 
                watches.put(clientPath, watchers); 
            } 
            watchers.add(watcher); //将 Watcher 对象放到 ZKWatchManager 中的 existsWatches 里面 
        } 
    } 
} 

下面这段代码是客户端存储 watcher 的几个 map 集合,分别对应三种注册监听事件

static class ZKWatchManager implements ClientWatchManager { 
    private final Map<String, Set<Watcher>> dataWatches = 
        new HashMap<String, Set<Watcher>>(); 
    private final Map<String, Set<Watcher>> existWatches = 
        new HashMap<String, Set<Watcher>>(); 
    private final Map<String, Set<Watcher>> childWatches = 
        new HashMap<String, Set<Watcher>>(); 

总的来说,当使用 ZooKeeper 构造方法或者使用 getData、exists 和 getChildren 三个接口来向 ZooKeeper 服务器注册 Watcher 的时候,首先将此消息传递给服务端,传递成功后,服务端会通知客户端,然后客户端将该路径和 Watcher 对应关系存储起来备用。

EventThread.queuePacket()

finishPacket 方法最终会调用 eventThread.queuePacket, 讲当前的数据包添加到等待事件通知的队列中

public void queuePacket(Packet packet) { 
    if (wasKilled) { 
        synchronized (waitingEvents) { 
            if (isRunning) waitingEvents.add(packet); 
            else processEvent(packet); 
        } 
    } else { 
        waitingEvents.add(packet); 
    } 
} 

事件触发

前面这么长的说明,只是为了清洗的说明事件的注册流程,最终的触发,还得需要通过事务型操作来完成

在我们最开始的案例中,通过如下代码去完成了事件的触发

zookeeper.setData(/mic”,1.getByte(),-1) ; //修改节点的值触发监听 

前面的客户端和服务端对接的流程就不再重复讲解了,交互流程是一样的,唯一的差别在于事件触发了 服务端的事件响应 DataTree.setData()

public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { 
    Stat s = new Stat(); 
    DataNode n = nodes.get(path); 
    if (n == null) { 
        throw new KeeperException.NoNodeException(); 
    } 
    byte lastdata[] = null; 
    synchronized (n) { 
        lastdata = n.data; 
        n.data = data; 
        n.stat.setMtime(time); 
        n.stat.setMzxid(zxid); 
        n.stat.setVersion(version); 
        n.copyStat(s); 
    } 
    // now update if the path is in a quota subtree. 
    String lastPrefix = getMaxPrefixWithQuota(path); 
    if(lastPrefix != null) { 
        this.updateBytes(lastPrefix, (data == null ? 0 : data.length)  - (lastdata == null ? 0 : lastdata.length)); 
    } 
    dataWatches.triggerWatch(path, EventType.NodeDataChanged); //触发对应节点的NodeDataChanged 事件 
    return s; 
} 

WatcherManager. triggerWatch

Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {  
    WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); // 根据事件类型、连接状态、节点路径创建 WatchedEvent 
    HashSet<Watcher> watchers;  
    synchronized (this) {  
        watchers = watchTable.remove(path); // 从 watcher 表中移除 path,并返回其对应的 watcher 集合 
        if (watchers == null || watchers.isEmpty()) {  
            if (LOG.isTraceEnabled()) {  
                ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,  "No watchers for " + path);  
            }  
            return null;  
        }  
        for (Watcher w : watchers) { // 遍历 watcher 集合 
            HashSet<String> paths = watch2Paths.get(w); // 根据 watcher 从 watcher 表中取出路径集合 
            if (paths != null) {  
                paths.remove(path); //移除路径  
            }  
        }  
    }  
    for (Watcher w : watchers) { // 遍历 watcher 集合 
        if (supress != null && supress.contains(w)) {  
            continue;  
        }  
        w.process(e); //OK,重点又来了,w.process 是做什么呢? 
    }  
    return watchers;  
}  

w.process(e);

还记得我们在服务端绑定事件的时候,watcher 绑定是是什么?是 ServerCnxn, 所以 w.process(e),其实调用的应该是 ServerCnxn 的 process 方法。而servercnxn 又是一个抽象方法,有两个实现类,分别是:NIOServerCnxn 和 NettyServerCnxn。那接下来我们扒开 NIOServerCnxn 这个类的 process 方法看看究竟

@Override
synchronized public void process(WatchedEvent event) {
    ReplyHeader h = new ReplyHeader(-1, -1L, 0);
    if (LOG.isTraceEnabled()) {
        ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                 "Deliver event " + event + " to 0x"
                                 + Long.toHexString(this.sessionId)
                                 + " through " + this);
    }

    // Convert WatchedEvent to a type that can be sent over the wire
    WatcherEvent e = event.getWrapper();

    sendResponse(h, e, "notification");//look, 这个地方发送了一个事件,事件对象为 WatcherEvent。完美 
}

那接下里,客户端会收到这个 response,触发 SendThread.readResponse 方法 客户端处理事件响应

SendThread.readResponse

这块代码上面已经贴过了,所以我们只挑选当前流程的代码进行讲解,按照前面我们将到过的,notifacation 通知消息的 xid 为-1,意味着~直接找到-1 的判断进行分析

void readResponse(ByteBuffer incomingBuffer) throws IOException {
    ByteBufferInputStream bbis = new ByteBufferInputStream(
        incomingBuffer);
    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    ReplyHeader replyHdr = new ReplyHeader();

    replyHdr.deserialize(bbia, "header");
    if (replyHdr.getXid() == -2) {
        // -2 is the xid for pings
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got ping response for sessionid: 0x"
                      + Long.toHexString(sessionId)
                      + " after "
                      + ((System.nanoTime() - lastPingSentNs) / 1000000)
                      + "ms");
        }
        return;
    }
    if (replyHdr.getXid() == -4) {
        // -4 is the xid for AuthPacket               
        if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
            state = States.AUTH_FAILED;                    
            eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
                                                     Watcher.Event.KeeperState.AuthFailed, null) );                                      
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got auth sessionid:0x"
                      + Long.toHexString(sessionId));
        }
        return;
    }
    if (replyHdr.getXid() == -1) {
        // -1 means notification
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got notification sessionid:0x"
                      + Long.toHexString(sessionId));
        }
        WatcherEvent event = new WatcherEvent();
        event.deserialize(bbia, "response"); //这个地方,是反序列化服务端的 WatcherEvent 事件。 

        // convert from a server path to a client path
        if (chrootPath != null) {
            String serverPath = event.getPath();
            if(serverPath.compareTo(chrootPath)==0)
                event.setPath("/");
            else if (serverPath.length() > chrootPath.length())
                event.setPath(serverPath.substring(chrootPath.length()));
            else {
                LOG.warn("Got server path " + event.getPath()
                         + " which is too short for chroot path "
                         + chrootPath);
            }
        }

        WatchedEvent we = new WatchedEvent(event); //组装 watchedEvent 对象。 
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got " + we + " for sessionid 0x"
                      + Long.toHexString(sessionId));
        }

        eventThread.queueEvent( we ); //通过 eventTherad 进行事件处理 
        return;
    }

    // If SASL authentication is currently in progress, construct and
    // send a response packet immediately, rather than queuing a
    // response as with other packets.
    if (clientTunneledAuthenticationInProgress()) {
        GetSASLRequest request = new GetSASLRequest();
        request.deserialize(bbia,"token");
        zooKeeperSaslClient.respondToServer(request.getToken(),
                                            ClientCnxn.this);
        return;
    }

    Packet packet;
    synchronized (pendingQueue) {
        if (pendingQueue.size() == 0) {
            throw new IOException("Nothing in the queue, but got "
                                  + replyHdr.getXid());
        }
        packet = pendingQueue.remove();
    }
    /*
     * Since requests are processed in order, we better get a response
     * to the first request!
     */
    try {
        if (packet.requestHeader.getXid() != replyHdr.getXid()) {
            packet.replyHeader.setErr(
                KeeperException.Code.CONNECTIONLOSS.intValue());
            throw new IOException("Xid out of order. Got Xid "
                                  + replyHdr.getXid() + " with err " +
                                  + replyHdr.getErr() +
                                  " expected Xid "
                                  + packet.requestHeader.getXid()
                                  + " for a packet with details: "
                                  + packet );
        }

        packet.replyHeader.setXid(replyHdr.getXid());
        packet.replyHeader.setErr(replyHdr.getErr());
        packet.replyHeader.setZxid(replyHdr.getZxid());
        if (replyHdr.getZxid() > 0) {
            lastZxid = replyHdr.getZxid();
        }
        if (packet.response != null && replyHdr.getErr() == 0) {
            packet.response.deserialize(bbia, "response");
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Reading reply sessionid:0x"
                      + Long.toHexString(sessionId) + ", packet:: " + packet);
        }
    } finally {
        finishPacket(packet);
    }
}

eventThread.queueEvent

SendThread 接收到服务端的通知事件后,会通过调用 EventThread 类的 queueEvent 方法将事件传给 EventThread 线程,queueEvent 方法根据该通知事件,从 ZKWatchManager 中取出所有相关的 Watcher,如果获取到相应的 Watcher,就会让 Watcher 移除失效。

private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) { 
    if (event.getType() == EventType.None && sessionState == event.getState()) { //判断类型 
        return; 
    } 
    sessionState = event.getState();  
    final Set<Watcher> watchers; 
    if (materializedWatchers == null) { 
        // materialize the watchers based on the event 
        watchers = watcher.materialize(event.getState(), 
                                       event.getType(), event.getPath()); 
    } else { 

        watchers = new HashSet<Watcher>(); 
        watchers.addAll(materializedWatchers); 
    } 

    //封装 WatcherSetEventPair 对象,添加到 waitngEvents 队列中 
    WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event); 
    // queue the pair (watch set & event) for later processing waitingEvents.add(pair); 
} 

Meterialize 方法

通过 dataWatches 或者 existWatches 或者 childWatches 的 remove 取出对应的 watch,表明客户端 watch 也是注册一次就移除 同时需要根据 keeperState、eventType 和 path 返回应该被通知的 Watcher 集合

@Override
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                Watcher.Event.EventType type,
                                String clientPath)
{
    Set<Watcher> result = new HashSet<Watcher>();

    switch (type) {
        case None:
            result.add(defaultWatcher);
            boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
                state != Watcher.Event.KeeperState.SyncConnected;

            synchronized(dataWatches) {
                for(Set<Watcher> ws: dataWatches.values()) {
                    result.addAll(ws);
                }
                if (clear) {
                    dataWatches.clear();
                }
            }

            synchronized(existWatches) {
                for(Set<Watcher> ws: existWatches.values()) {
                    result.addAll(ws);
                }
                if (clear) {
                    existWatches.clear();
                }
            }

            synchronized(childWatches) {
                for(Set<Watcher> ws: childWatches.values()) {
                    result.addAll(ws);
                }
                if (clear) {
                    childWatches.clear();
                }
            }

            return result;
        case NodeDataChanged:
        case NodeCreated:
            synchronized (dataWatches) {
                addTo(dataWatches.remove(clientPath), result);
            }
            synchronized (existWatches) {
                addTo(existWatches.remove(clientPath), result);
            }
            break;
        case NodeChildrenChanged:
            synchronized (childWatches) {
                addTo(childWatches.remove(clientPath), result);
            }
            break;
        case NodeDeleted:
            synchronized (dataWatches) {
                addTo(dataWatches.remove(clientPath), result);
            }
            // XXX This shouldn't be needed, but just in case
            synchronized (existWatches) {
                Set<Watcher> list = existWatches.remove(clientPath);
                if (list != null) {
                    addTo(list, result);
                    LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
                }
            }
            synchronized (childWatches) {
                addTo(childWatches.remove(clientPath), result);
            }
            break;
        default:
            String msg = "Unhandled watch event type " + type
                + " with state " + state + " on path " + clientPath;
            LOG.error(msg);
            throw new RuntimeException(msg);
    }

    return result;
}

waitingEvents.add

最后一步,接近真相了

waitingEvents 是 EventThread 这个线程中的阻塞队列,很明显,又是在我们第一步操作的时候实例化的一个线程。 从名字可以知道,waitingEvents 是一个待处理 Watcher 的队列,EventThread 的 run() 方法会不断从队列中取数据,交由 processEvent 方法处理:

@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
    try {
        isRunning = true;
        while (true) {
            Object event = waitingEvents.take();
            if (event == eventOfDeath) {
                wasKilled = true;
            } else {
                processEvent(event);
            }
            if (wasKilled)
                synchronized (waitingEvents) {
                if (waitingEvents.isEmpty()) {
                    isRunning = false;
                    break;
                }
            }
        }
    } catch (InterruptedException e) {
        LOG.error("Event thread exiting due to interruption", e);
    }

    LOG.info("EventThread shut down for session: 0x{}",
             Long.toHexString(getSessionId()));
}

ProcessEvent

由于这块的代码太长,我只把核心的代码贴出来,这里就是处理事件触发的核心代码

private void processEvent(Object event) { 
    try { 
        if (event instanceof WatcherSetEventPair) { //判断事件类型 
            // each watcher will process the event 
            WatcherSetEventPair pair = (WatcherSetEventPair) event; //得到 watcherseteventPair 
            for (Watcher watcher : pair.watchers) { //拿到符合触发机制的所有 watcher 列表,循环进行调用 
                try { 
                    watcher.process(pair.event); //调用客户端的回调 process 
                } catch (Throwable t) { 
                    LOG.error("Error while calling watcher ", t); 
                } 
            } 
        } 

服务端接收数据请求

服务端收到的数据包应该在哪里呢?在上节课分析过了,zookeeper 启动的时候,通过下面的代码构建了一个

ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); 

NIOServerCnxnFactory,它实现了 Thread,所以在启动的时候,会在 run 方法中不断循环接收客户端的请求进行分发

NIOServerCnxnFactory.run

public void run() { 
    while (!ss.socket().isClosed()) { 
        try { 
            for (SelectionKey k : selectedList) { 
                // 获取 client 的连接请求 
                if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { 
                } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { 
                    //处理客户端的读/写请求 
                    NIOServerCnxn c = (NIOServerCnxn) k.attachment(); 
                    c.doIO(k);//处理 IO 操作 
                } else { 
                    if (LOG.isDebugEnabled()) { 
                        LOG.debug("Unexpected ops in select "  + k.readyOps());           
                    } 
                } 
                selected.clear(); 
            } catch (RuntimeException e) { 
                LOG.warn("Ignoring unexpected runtime exception", e); 
            } catch (Exception e) { 
                LOG.warn("Ignoring exception", e); 
            } 
        } 

        closeAll(); 
        LOG.info("NIOServerCnxn factory exited run method"); 

    } 

NIOServerCnxn.doIO

void doIO(SelectionKey k)  
    try { 
        //省略部分代码.. 
        if (k.isReadable()) {//处理读请求,表示接收 
            //中间这部分逻辑用来处理报文以及粘包问题 
            if (isPayload) { // not the case for 4letterword 
                readPayload();//处理报文 
            } 
            else { 
                // four letter words take care 
                // need not do anything else 
                return; 
            } 
        } 
    } 

NIOServerCnxn.readRequest

读取客户端的请求,进行具体的处理

private void readRequest() throws IOException { 
    zkServer.processPacket(this, incomingBuffer); 
} 

ZookeeperServer.processPacket

这个方法根据数据包的类型来处理不同的数据包,对于读写请求,我们主要关注下面这块代码即可

Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), 
                         h.getType(), incomingBuffer, cnxn.getAuthInfo()); 
si.setOwner(ServerCnxn.me); 
submitRequest(si); 

后续的流程,在前面的源码分析中有些,就不做重复黏贴了。

集群模式下的处理流程

集群模式下,涉及到 zab 协议,所以处理流程比较复杂,大家可以基于这个图来
定位代码的流程

1565098576075

上次编辑于:
贡献者: soulballad