跳至主要內容

zookeeper中监听原理源码

soulballad分布式ZookeeperZookeeper约 4015 字大约 13 分钟

目录

  1. zookeeper 初始化,并调用 exists方法
  1. 服务端注册监听:
    接下来要看之前的 cnxnFactory
  1. 客户端注册监听:
    服务端发送返回结果后,客户端需要读取;前面的“ClientCnxnSocketNIO.doIO”中,只看了“isWritable”,实际上还有“isReadable”部分
  1. 如何触发监听事件?·
    在FinalRequestProcessor中如果接收到一个事务请求 setData/delete/create,会执行下面的逻辑:

new ZooKeeper

在创建一个 ZooKeeper 客户端对象实例时,我们通过 new Watcher() 向构造方法中传入一个默认的 Watcher,这个 Watcher 将作为整个 ZooKeeper 会话期间的默认 Watcher,会一直被保存在客户端 ZKWatchManager 的 defaultWatcher 中;代码如下:

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
        boolean canBeReadOnly) throws IOException {
    LOG.info("Initiating client connection, connectString=" + connectString
            + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
    // 通过构造函数传入 watcher
    watchManager.defaultWatcher = watcher;
    // connectString -> zooKeeper url地址
    ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
    HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
    // 客户端网络操作工具
    cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
            hostProvider, sessionTimeout, this, watchManager,
            getClientCnxnSocket(), canBeReadOnly);
    cnxn.start();
}

new ClientCnxn

ClientCnxn:是 Zookeeper 客户端和 Zookeeper 服务器端进行通信和事件通知处理的主要类,它内部包含两个类,

  1. SendThread :负责客户端和服务器端的数据通信, 也包括事件信息的传输
  2. EventThread : 主要在客户端回调注册的 Watchers 进行通知处理
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
        ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
        long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
    this.zooKeeper = zooKeeper;
    this.watcher = watcher;
    this.sessionId = sessionId;
    this.sessionPasswd = sessionPasswd;
    this.sessionTimeout = sessionTimeout;
    this.hostProvider = hostProvider;
    this.chrootPath = chrootPath;

    connectTimeout = sessionTimeout / hostProvider.size();
    readTimeout = sessionTimeout * 2 / 3;
    readOnly = canBeReadOnly;

    sendThread = new SendThread(clientCnxnSocket);
    eventThread = new EventThread();

}

ZooKeeper.getClientCnxnSocket

private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
    String clientCnxnSocketName = System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
    if (clientCnxnSocketName == null) {
        // 默认使用 NIO
        clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
    }
    try {
        return (ClientCnxnSocket) Class.forName(clientCnxnSocketName).getDeclaredConstructor().newInstance();
    } catch (Exception e) {
        IOException ioe = new IOException("Couldn't instantiate " + clientCnxnSocketName);
        ioe.initCause(e);
        throw ioe;
    }
}

ClientCnxn.start

public void start() {
    sendThread.start();
    eventThread.start();
}

SendThread.run

@Override
public void run() {
    clientCnxnSocket.introduce(this,sessionId);
    clientCnxnSocket.updateNow();
    clientCnxnSocket.updateLastSendAndHeard();
    int to;
    long lastPingRwServer = Time.currentElapsedTime();
    final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
    InetSocketAddress serverAddress = null;
    while (state.isAlive()) {
        try {
            // ...
            if (state.isConnected()) {
                //1000(1 second) is to prevent race condition missing to send the second ping
                //also make sure not to send too many pings when readTimeout is small 
                int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - 
                        ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                    sendPing();
                    clientCnxnSocket.updateLastSend();
                } else {
                    if (timeToNextPing < to) {
                        to = timeToNextPing;
                    }
                }
            }

            // ...
            clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
        } catch (Throwable e) {
            // ...
        }
    }
    cleanup();
    clientCnxnSocket.close();
    if (state.isAlive()) {
        eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null)); }
    ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), "SendThread exited loop for session: 0x"+ Long.toHexString(getSessionId())); 
}

SendThread.sendPing

private void sendPing() {
    lastPingSentNs = System.nanoTime();
    RequestHeader h = new RequestHeader(-2, OpCode.ping);
    queuePacket(h, null, null, null, null, null, null, null, null);
}

ClientCnxn.queuePacket

封装一个 packet 并添加到 queuePacket 队列中

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) {Packet packet = null;

    // Note that we do not generate the Xid for the packet yet. It is
    // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
    // where the packet is actually sent.
    synchronized (outgoingQueue) {
        packet = new Packet(h, r, request, response, watchRegistration);
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        if (!state.isAlive() || closing) {
            conLossPacket(packet);
        } else {
            // If the client is asking to close the session then
            // mark as closing
            if (h.getType() == OpCode.closeSession) {
                closing = true;
            }
            outgoingQueue.add(packet);
        }
    }
    sendThread.getClientCnxnSocket().wakeupCnxn();
    return packet;
}

ClientCnxnSocketNIO.doTransport

@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
                 ClientCnxn cnxn) throws IOException, InterruptedException {
    selector.select(waitTimeOut);
    Set<SelectionKey> selected;
    synchronized (this) {
        selected = selector.selectedKeys();
    }
    // Everything below and until we get back to the select is
    // non blocking, so time is effectively a constant. That is
    // Why we just have to do this once, here
    updateNow();
    for (SelectionKey k : selected) {
        SocketChannel sc = ((SocketChannel) k.channel());
        if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
            if (sc.finishConnect()) {
                updateLastSendAndHeard();
                sendThread.primeConnection();
            }
        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0){
            doIO(pendingQueue, outgoingQueue, cnxn);
        }
    }
    if (sendThread.getZkState().isConnected()) {
        synchronized(outgoingQueue) {
            if (findSendablePacket(outgoingQueue,
                    cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                enableWrite();
            }
        }
    }
    selected.clear();
}

ClientCnxnSocketNIO.doIO

void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
  throws InterruptedException, IOException {
    SocketChannel sock = (SocketChannel) sockKey.channel();
    if (sock == null) {
        throw new IOException("Socket is null!");
    }
    // read
    if (sockKey.isReadable()) {
        int rc = sock.read(incomingBuffer);
        if (rc < 0) {
            throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"+ Long.toHexString(sessionId) + ", likely server has closed socket"); }
        if (!incomingBuffer.hasRemaining()) {
            incomingBuffer.flip();
            if (incomingBuffer == lenBuffer) {
                recvCount++;
                readLength();
            } else if (!initialized) {
                readConnectResult();
                enableRead();
                if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                    enableWrite();
                }
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
                initialized = true;
            } else {
                sendThread.readResponse(incomingBuffer);
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
            }
        }
    }
    // write
    if (sockKey.isWritable()) {
        synchronized(outgoingQueue) {
            Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress());
            if (p != null) {
                updateLastSend();
                // If we already started writing p, p.bb will already exist
                if (p.bb == null) {
                    if ((p.requestHeader != null) &&
                            (p.requestHeader.getType() != OpCode.ping) &&
                            (p.requestHeader.getType() != OpCode.auth)) {
                        p.requestHeader.setXid(cnxn.getXid());
                    }
                    p.createBB();
                }
                sock.write(p.bb);
                if (!p.bb.hasRemaining()) {
                    sentCount++;
                    outgoingQueue.removeFirstOccurrence(p);
                    if (p.requestHeader != null
                            && p.requestHeader.getType() != OpCode.ping
                            && p.requestHeader.getType() != OpCode.auth) {
                        synchronized (pendingQueue) {
                            pendingQueue.add(p);
                        }
                    }
                }
            }
            if (outgoingQueue.isEmpty()) {
                disableWrite();
            } else if (!initialized && p != null && !p.bb.hasRemaining()) {
                disableWrite();
            } else {
                enableWrite();
            }
        }
    }
}

ClientCnxn.Packet.createBB

数据包中包含 requestHeader.serialize(boa, "header")和request.serialize(boa, "request")

public void createBB() {
try {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
    boa.writeInt(-1, "len"); // We'll fill this in later
    // zookeeper 自己序列化方式 jute
    if (requestHeader != null) {
        requestHeader.serialize(boa, "header");
    }
    if (request instanceof ConnectRequest) {
        request.serialize(boa, "connect");
        // append "am-I-allowed-to-be-readonly" flag
        boa.writeBool(readOnly, "readOnly");
    } else if (request != null) {
        request.serialize(boa, "request");
    }
    baos.close();
    this.bb = ByteBuffer.wrap(baos.toByteArray());
    this.bb.putInt(this.bb.capacity() - 4);
    this.bb.rewind();
} catch (IOException e) {
    LOG.warn("Ignoring unexpected exception", e);
}
}

EventThread.run

@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
   try {
      isRunning = true;
      while (true) {
          // take 阻塞获取
         Object event = waitingEvents.take();
         if (event == eventOfDeath) {
            wasKilled = true;
         } else {
            processEvent(event);
         }
         if (wasKilled)
            synchronized (waitingEvents) {
               if (waitingEvents.isEmpty()) {
                  isRunning = false;
                  break;
               }
            }
      }
   } catch (InterruptedException e) {
      LOG.error("Event thread exiting due to interruption", e);
   }

    LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId()));
}

EventThread.processEvent

private void processEvent(Object event) {
      try {
          if (event instanceof WatcherSetEventPair) {
              // each watcher will process the event
              WatcherSetEventPair pair = (WatcherSetEventPair) event;
              for (Watcher watcher : pair.watchers) {
                  try {
                      watcher.process(pair.event);
                  } catch (Throwable t) {
                      LOG.error("Error while calling watcher ", t);
                  }
              }
          } else {
              // ...
          }
      } catch (Throwable t) {
          LOG.error("Caught unexpected throwable", t);
      }
   }
}

ZooKeeper.exists

public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException{
    // defaultWatcher 为构造函数传入
    return exists(path, watch ? watchManager.defaultWatcher : null);
}

ZooKeeper.exists2

public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException {final String clientPath = path;
    PathUtils.validatePath(clientPath);

    // the watch contains the un-chroot path
    WatchRegistration wcb = null;
    if (watcher != null) {
        wcb = new ExistsWatchRegistration(watcher, clientPath);
    }

    final String serverPath = prependChroot(clientPath);

    RequestHeader h = new RequestHeader();
    // type 后面判断要用到
    h.setType(ZooDefs.OpCode.exists);
    ExistsRequest request = new ExistsRequest();
    request.setPath(serverPath); //path
    request.setWatch(watcher != null); //true
    SetDataResponse response = new SetDataResponse();
    // cnxn 也是在构造函数中进行的初始化
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    if (r.getErr() != 0) {
        if (r.getErr() == KeeperException.Code.NONODE.intValue()) {
            return null;
        }
        throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
    }

    return response.getStat().getCzxid() == -1 ? null : response.getStat();
}

ClientCnxn.submitRequest

public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration);
    synchronized (packet) {
        while (!packet.finished) {
            packet.wait();
        }
    }
    return r;
}

NIOServerCnxnFactory.run

//当收到客户端的请求时,会需要从这个方法里面来看-> create/delete/setdata
public void run() {
    while (!ss.socket().isClosed()) {
        try {
            selector.select(1000);
            Set<SelectionKey> selected;
            synchronized (this) {
                selected = selector.selectedKeys();
            }
            ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
                    selected);
            Collections.shuffle(selectedList);
            for (SelectionKey k : selectedList) {
                if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                    // ...
                // 读操作或者写操作
                } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                    NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                    c.doIO(k);
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Unexpected ops in select "+ k.readyOps()); 
                    }
                }
            }
            selected.clear();
        } catch (RuntimeException e) {
            LOG.warn("Ignoring unexpected runtime exception", e);
        } catch (Exception e) {
            LOG.warn("Ignoring exception", e);
        }
    }
    closeAll();
    LOG.info("NIOServerCnxn factory exited run method");
}

NIOServerCnxn.doIO

void doIO(SelectionKey k) throws InterruptedException {
    try {
        if (isSocketOpen() == false) {
            LOG.warn("trying to do i/o on a null socket for session:0x"+ Long.toHexString(sessionId));
            return;
        }
        if (k.isReadable()) {
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException("Unable to read additional data from client sessionid 0x"+ Long.toHexString(sessionId) + ", likely client has closed socket");
            }
            if (incomingBuffer.remaining() == 0) {
                boolean isPayload;
                if (incomingBuffer == lenBuffer) { // start of next request
                    incomingBuffer.flip();
                    isPayload = readLength(k);
                    incomingBuffer.clear();
                } else {
                    // continuation
                    isPayload = true;
                }
                if (isPayload) { // not the case for 4letterword
                    readPayload();
                }
                else {
                    return;
                }
            }
        }
        if (k.isWritable()) {
            if (outgoingBuffers.size() > 0) {
                
                ByteBuffer directBuffer = factory.directBuffer;
                directBuffer.clear();

                for (ByteBuffer b : outgoingBuffers) {
                    if (directBuffer.remaining() < b.remaining()) {                
                        b = (ByteBuffer) b.slice().limit(directBuffer.remaining());
                    }
                   
                    int p = b.position();
                    directBuffer.put(b);
                    b.position(p);
                    if (directBuffer.remaining() == 0) {
                        break;
                    }
                }
                
                directBuffer.flip();

                int sent = sock.write(directBuffer);
                ByteBuffer bb;

                // Remove the buffers that we have sent
                while (outgoingBuffers.size() > 0) {
                    bb = outgoingBuffers.peek();
                    if (bb == ServerCnxnFactory.closeConn) {
                        throw new CloseRequestException("close requested");
                    }
                    int left = bb.remaining() - sent;
                    if (left > 0) {                       
                        bb.position(bb.position() + sent);
                        break;
                    }
                    packetSent();                    
                    sent -= bb.remaining();
                    outgoingBuffers.remove();
                }                
            }

            synchronized(this.factory){
                if (outgoingBuffers.size() == 0) {
                    if (!initialized && (sk.interestOps() & SelectionKey.OP_READ) == 0) {
                        throw new CloseRequestException("responded to info probe");
                    }
                    sk.interestOps(sk.interestOps() & (~SelectionKey.OP_WRITE)); 
                } else {
                    sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); 
                }
            }
        }
    } catch (CancelledKeyException e) {
        // ...
    }
}

NIOServerCnxn.readPayload

private void readPayload() throws IOException, InterruptedException {
    if (incomingBuffer.remaining() != 0) { // have we read length bytes?
        int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
        if (rc < 0) {
            throw new EndOfStreamException("Unable to read additional data from client sessionid 0x"+ Long.toHexString(sessionId) + ", likely client has closed socket"); 
        }
    }

    if (incomingBuffer.remaining() == 0) { // have we read length bytes?
        packetReceived();
        incomingBuffer.flip();
        if (!initialized) {
            readConnectRequest();
        } else {
            readRequest();
        }
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }
}

NIOServerCnxn.readRequest

private void readRequest() throws IOException {
    zkServer.processPacket(this, incomingBuffer);
}

ZooKeeperServer.processPacket

public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
    // We have the request, now process and setup for next
    InputStream bais = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
    /**
     * RequestHeader h = new RequestHeader();
     * 调用 exists 方法,type为 OpCode.exists
     * h.setType(ZooDefs.OpCode.exists);
     */
    RequestHeader h = new RequestHeader();
    // 反序列化 header
    h.deserialize(bia, "header");
    incomingBuffer = incomingBuffer.slice();
    if (h.getType() == OpCode.auth) {
        // ...
        return;
    } else {
        if (h.getType() == OpCode.sasl) {
            Record rsp = processSasl(incomingBuffer,cnxn);
            ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
            cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
            return;
        } else {
            // exists 会走这里
            Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
              h.getType(), incomingBuffer, cnxn.getAuthInfo());
            si.setOwner(ServerCnxn.me);
            submitRequest(si);
        }
    }
    cnxn.incrOutstandingRequests(h);
}

ZooKeeperServer.submitRequest

public void submitRequest(Request si) {
    if (firstProcessor == null) {
        synchronized (this) {
            try {              
                while (state == State.INITIAL) {
                    wait(1000);
                }
            } catch (InterruptedException e) {
                LOG.warn("Unexpected interruption", e);
            }
            if (firstProcessor == null || state != State.RUNNING) {
                throw new RuntimeException("Not started");
            }
        }
    }
    try {
        touch(si.cnxn);
        boolean validpacket = Request.isValid(si.type);
        if (validpacket) {
            firstProcessor.processRequest(si);
            if (si.cnxn != null) {
                incInProcess();
            }
        } else {
            LOG.warn("Received packet at server of unknown type " + si.type);
            new UnimplementedRequestProcessor().processRequest(si);
        }
    } catch (MissingSessionException e) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Dropping request: " + e.getMessage());
        }
    } catch (RequestProcessorException e) {
        LOG.error("Unable to process request:" + e.getMessage(), e);
    }
}

ZooKeeperServer.setupRequestProcessors

protected void setupRequestProcessors() {
    // 责任链: PrepRequestProcessor->SyncRequestProcessor->FinalRequestProcessor
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
    // 启动 SyncRequestProcessor 线程
    ((SyncRequestProcessor)syncProcessor).start();
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    // 启动 PrepRequestProcessor 线程
    ((PrepRequestProcessor)firstProcessor).start();
}

FollowerZooKeeperServer.setupRequestProcessors

@Override
protected void setupRequestProcessors() {
    // 责任链:follow中有两条链
    // FollowerRequestProcessor->CommitProcessor->FinalRequestProcessor
    // SyncRequestProcessor->SendAckRequestProcessor
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    commitProcessor = new CommitProcessor(finalProcessor,
            Long.toString(getServerId()), true,
            getZooKeeperServerListener());
    commitProcessor.start();
    firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
    ((FollowerRequestProcessor) firstProcessor).start();
    syncProcessor = new SyncRequestProcessor(this,
            new SendAckRequestProcessor((Learner)getFollower()));
    syncProcessor.start();
}

LeaderZooKeeperServer.setupRequestProcessors

@Override
protected void setupRequestProcessors() {
    // 责任链:PrepRequestProcessor->ProposalRequestProcessor->CommitProcessor->ToBeAppliedRequestProcessor->FinalRequestProcessor
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor( finalProcessor, getLeader().toBeApplied);
    commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
    commitProcessor.start();
    ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
    proposalProcessor.initialize();
    firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
    ((PrepRequestProcessor)firstProcessor).start();
}

ObserverZooKeeperServer.setupRequestProcessors

@Override
protected void setupRequestProcessors() {      
    // 责任链:ObserverRequestProcessor->CommitProcessor->FinalRequestProcessor
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
    commitProcessor.start();
    firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
    ((ObserverRequestProcessor) firstProcessor).start();

    if (syncRequestProcessorEnabled) {
        syncProcessor = new SyncRequestProcessor(this, null);
        syncProcessor.start();
    }
}

PrepRequestProcessor.processRequest

public void processRequest(Request request) {
    // request.addRQRec(">prep="+zks.outstandingChanges.size());
    submittedRequests.add(request);
}

PrepRequestProcessor.run

@Override
public void run() {
    try {
        while (true) {
            Request request = submittedRequests.take();
            long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
            if (request.type == OpCode.ping) { // 心跳
                traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
            }
            if (LOG.isTraceEnabled()) {
                ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
            }
            if (Request.requestOfDeath == request) {
                break;
            }
            pRequest(request);
        }
    } catch (RequestProcessorException e) {
       // ...
    }
    LOG.info("PrepRequestProcessor exited loop!");
}

PrepRequestProcessor.pRequest

protected void pRequest(Request request) throws RequestProcessorException {
    request.hdr = null;
    request.txn = null;
    // ...
    request.zxid = zks.getZxid();
    nextProcessor.processRequest(request);
}

SyncRequestProcessor.processRequest

public void processRequest(Request request) {
    // request.addRQRec(">sync");
    queuedRequests.add(request);
}

SyncRequestProcessor.run

@Override
public void run() {
    try {
        int logCount = 0;
        setRandRoll(r.nextInt(snapCount/2));
        while (true) {
            // ...
            if (si != null) {
                if (zks.getZKDatabase().append(si)) {
                    // ...
                } else if (toFlush.isEmpty()) {
                    if (nextProcessor != null) {
                        nextProcessor.processRequest(si);
                       // ...
                    }
                    continue;
                }
               // ...
            }
        }
    } catch (Throwable t) {
        handleException(this.getName(), t);
        running = false;
    }
    LOG.info("SyncRequestProcessor exited!");
}

FinalRequestProcessor.processRequest

public void processRequest(Request request) {
    // ...
    try {
        // ...        
        case OpCode.exists: {
            lastOp = "EXIS";
            // TODO we need to figure out the security requirement for this!
            ExistsRequest existsRequest = new ExistsRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest);
            // -> watcher
            String path = existsRequest.getPath();
            if (path.indexOf('\0') != -1) {
                throw new KeeperException.BadArgumentsException();
            }
            // cnxn -> 服务端网络处理组件
            Stat stat = zks.getZKDatabase().statNode(path, existsRequest .getWatch() ? cnxn : null); 
            rsp = new ExistsResponse(stat);
            break;
        }
        
    } catch (SessionMovedException e) {
        // ...
    }

    //...
    try {
        cnxn.sendResponse(hdr, rsp, "response");
        if (closeSession) {
            cnxn.sendCloseSession();
        }
    } catch (IOException e) {
        LOG.error("FIXMSG",e);
    }
}

ZKDatabase.statNode

public Stat statNode(String path, ServerCnxn serverCnxn) throws KeeperException.NoNodeException {
    return dataTree.statNode(path, serverCnxn);
}

DataTree.statNode

public Stat statNode(String path, Watcher watcher) throws KeeperException.NoNodeException {
    Stat stat = new Stat();
    DataNode n = nodes.get(path);
    if (watcher != null) {
        dataWatches.addWatch(path, watcher);
    }
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    synchronized (n) {
        n.copyStat(stat);
        return stat;
    }
}

WatchManager.addWatch

public synchronized void addWatch(String path, Watcher watcher) {
    HashSet<Watcher> list = watchTable.get(path);
    if (list == null) {   
        // 这里watch实际上是前面的cnxn
        // zks.getZKDatabase().statNode(path, existsRequest .getWatch() ? cnxn : null)
        list = new HashSet<Watcher>(4);
        watchTable.put(path, list);
    }
    list.add(watcher);

    HashSet<String> paths = watch2Paths.get(watcher);
    if (paths == null) {
        // cnxns typically have many watches, so use default cap here
        paths = new HashSet<String>();
        watch2Paths.put(watcher, paths);
    }
    paths.add(path);
}

NIOServerCnxn.sendResponse

synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
    try {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        // Make space for length
        BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
        try {
            baos.write(fourBytes);
            bos.writeRecord(h, "header");
            if (r != null) {
                bos.writeRecord(r, tag);
            }
            baos.close();
        } catch (IOException e) {
            LOG.error("Error serializing response");
        }
        byte b[] = baos.toByteArray();
        ByteBuffer bb = ByteBuffer.wrap(b);
        bb.putInt(b.length - 4).rewind();
        sendBuffer(bb);
        // ...
     } catch(Exception e) {
        LOG.warn("Unexpected exception. Destruction averted.", e);
     }
}

ClientCnxnSocketNIO.doIO2

if (sockKey.isReadable()) {
    int rc = sock.read(incomingBuffer);
    // ...
    if (!incomingBuffer.hasRemaining()) {
        incomingBuffer.flip();
        if (incomingBuffer == lenBuffer) {
            recvCount++;
            readLength();
        } else if (!initialized) {
            // ...
        } else {
            sendThread.readResponse(incomingBuffer);
            lenBuffer.clear();
            incomingBuffer = lenBuffer;
            updateLastHeard();
        }
    }
}

SendThread.readResponse

void readResponse(ByteBuffer incomingBuffer) throws IOException {
    // ...
    try {
        // ...
        packet.replyHeader.setXid(replyHdr.getXid());
        packet.replyHeader.setErr(replyHdr.getErr());
        packet.replyHeader.setZxid(replyHdr.getZxid());
        if (replyHdr.getZxid() > 0) {
            lastZxid = replyHdr.getZxid();
        }
        if (packet.response != null && replyHdr.getErr() == 0) {
            // 反序列化 response
            packet.response.deserialize(bbia, "response");
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Reading reply sessionid:0x"  + Long.toHexString(sessionId) + ", packet:: " + packet);
        }
    } finally {
        finishPacket(packet);
    }
}

ClientCnxn.finishPacket

private void finishPacket(Packet p) {
    if (p.watchRegistration != null) {
        // watchRegistration -> ExistWatchRegisteration 
        // wcb = new ExistsWatchRegistration(watcher, clientPath);
        p.watchRegistration.register(p.replyHeader.getErr());
    }

    if (p.cb == null) {
        synchronized (p) {
            p.finished = true;
            p.notifyAll();
        }
    } else {
        p.finished = true;
        eventThread.queuePacket(p);
    }
}

WatchRegistration.register

public void register(int rc) {
    if (shouldAddWatch(rc)) {
        Map<String, Set<Watcher>> watches = getWatches(rc);
        synchronized(watches) {
            Set<Watcher> watchers = watches.get(clientPath);
            if (watchers == null) {
                watchers = new HashSet<Watcher>();
                watches.put(clientPath, watchers);
            }
            watchers.add(watcher);
        }
    }
}

WatchRegistration.getWatches

@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
    // key->path, value->自定义watcher
    private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
    private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
    private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
    
    return rc == 0 ?  watchManager.dataWatches : watchManager.existWatches;
}

EventThread.queuePacket

@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void queuePacket(Packet packet) {
  if (wasKilled) {
     synchronized (waitingEvents) {
        if (isRunning) waitingEvents.add(packet);
        else processEvent(packet);
     }
  } else {
     waitingEvents.add(packet);
  }
}

FinalRequestProcessor.processRequest2

synchronized (zks.outstandingChanges) {
    // ...
    if (request.hdr != null) {
       TxnHeader hdr = request.hdr;
       Record txn = request.txn;
       // 如果是一个事务请求,setData/delete等操作
       rc = zks.processTxn(hdr, txn);
    }
    // do not add non quorum packets to the queue.
    if (Request.isQuorum(request.type)) {
        zks.getZKDatabase().addCommittedProposal(request);
    }
}

ZooKeeperServer.processTxn

public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
    ProcessTxnResult rc;
    int opCode = hdr.getType();
    long sessionId = hdr.getClientId();
    rc = getZKDatabase().processTxn(hdr, txn);
    if (opCode == OpCode.createSession) {
        if (txn instanceof CreateSessionTxn) {
            CreateSessionTxn cst = (CreateSessionTxn) txn;
            sessionTracker.addSession(sessionId, cst.getTimeOut());
        } else {
            LOG.warn("*****>>>>> Got " + txn.getClass() + " " + txn.toString());
        }
    } else if (opCode == OpCode.closeSession) {
        sessionTracker.removeSession(sessionId);
    }
    return rc;
}

ZKDatabase.processTxn

public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
    return dataTree.processTxn(hdr, txn);
}

DataTree.processTxn

public ProcessTxnResult processTxn(TxnHeader header, Record txn) {
    switch (header.getType()) {
        case OpCode.setData:
            SetDataTxn setDataTxn = (SetDataTxn) txn;
            rc.path = setDataTxn.getPath();
            rc.stat = setData(setDataTxn.getPath(), setDataTxn .getData(), setDataTxn.getVersion(), header .getZxid(), header.getTime()); 
            break;
    }
} catch (KeeperException e) {
    // ...
}

return rc;
}

DataTree.setData

public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException {
    Stat s = new Stat();
    DataNode n = nodes.get(path);
    // ...
    dataWatches.triggerWatch(path, EventType.NodeDataChanged);
    return s;
}

WatchManager.triggerWatch

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
    HashSet<Watcher> watchers;
    synchronized (this) {
        // map中移除了注册的watcher,所以只能触发一次
        watchers = watchTable.remove(path);
        if (watchers == null || watchers.isEmpty()) {
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path); 
            }
            return null;
        }
        for (Watcher w : watchers) {
            // key->cnxn,value->paths
            HashSet<String> paths = watch2Paths.get(w);
            if (paths != null) {
                paths.remove(path);
            }
        }
    }
    for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
            continue;
        }
        w.process(e);
    }
    return watchers;
}

Watcher.process

synchronized public void process(WatchedEvent event) {
    // 注意这里,xid=-1,zxid=-1
    ReplyHeader h = new ReplyHeader(-1, -1L, 0);
    // ...
    // Convert WatchedEvent to a type that can be sent over the wire
    WatcherEvent e = event.getWrapper();
    // 发送返回信息给客户端
    sendResponse(h, e, "notification");
}

ClientCnxnSocketNIO.doIO3

void readResponse(ByteBuffer incomingBuffer) throws IOException {
    // ...
    if (replyHdr.getXid() == -1) {
        // -1 means notification
        WatcherEvent event = new WatcherEvent();
        event.deserialize(bbia, "response");

        // convert from a server path to a client path
        if (chrootPath != null) {
            String serverPath = event.getPath();
            if(serverPath.compareTo(chrootPath)==0)
                event.setPath("/");
            else if (serverPath.length() > chrootPath.length())
                event.setPath(serverPath.substring(chrootPath.length()));
            else {
                LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath);
            }
        }

        WatchedEvent we = new WatchedEvent(event);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId));
        }

        eventThread.queueEvent( we );
        return;
    }
}

EventThread.queueEvent

public void queueEvent(WatchedEvent event) {
    if (event.getType() == EventType.None && sessionState == event.getState()) {
        return;
    }
    sessionState = event.getState();

    // materialize the watchers based on the event
    WatcherSetEventPair pair = new WatcherSetEventPair(watcher.materialize(event.getState(), event.getType(), event.getPath()), event);
    // queue the pair (watch set & event) for later processing
    waitingEvents.add(pair);
}

EventThread.run2

public void run() {
   try {
      isRunning = true;
      while (true) {
         Object event = waitingEvents.take();
         if (event == eventOfDeath) {
            wasKilled = true;
         } else {
            processEvent(event);
         }
         // ...
      }
   } catch (InterruptedException e) {
      LOG.error("Event thread exiting due to interruption", e);
   }
}

EventThread.processEvent2

private void processEvent(Object event) {
    try {
        // 消费事件
      if (event instanceof WatcherSetEventPair) {
          // each watcher will process the event
          WatcherSetEventPair pair = (WatcherSetEventPair) event;
          for (Watcher watcher : pair.watchers) {
              try {
                  // 这里就是调用自定义watcher
                  watcher.process(pair.event);
              } catch (Throwable t) {
                  LOG.error("Error while calling watcher ", t);
              }
          }
      } else {
        // 消费 packet
        // ...
       }
    }
}
上次编辑于:
贡献者: soulballad