跳至主要內容

Zookeeper选举原理源码

soulballad分布式ZookeeperZookeeper约 5242 字大约 17 分钟

目录

QuorumPeerMain.main 入口:调用了 initializeAndRun 进行初始化并且运行

QuorumPeerMain.main

main 方法中,调用了 initializeAndRun 进行初始化并且运行

public static void main(String[] args) {
    QuorumPeerMain main = new QuorumPeerMain();
    try {
        //zoo.cfg
        main.initializeAndRun(args);
    } catch (IllegalArgumentException e) {
        // ...
    }
    LOG.info("Exiting normally");
    System.exit(0);
}

QuorumPeerMain.initializeAndRun

protected void initializeAndRun(String[] args) throws ConfigException, IOException{
    //保存zoo.cfg文件解析之后的所有参数(一定在后面有用到)
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
        config.parse(args[0]);
    }

    // ()Start and schedule the the purge task
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
            .getDataDir(), config.getDataLogDir(), config
            .getSnapRetainCount(), config.getPurgeInterval());
    purgeMgr.start();

    if (args.length == 1 && config.servers.size() > 0) {
        runFromConfig(config);//如果args==1,走这段代码
    } else {
        LOG.warn("Either no config or no quorum defined in config, running "
                + " in standalone mode");
        // there is only server in the quorum -- run as standalone
        ZooKeeperServerMain.main(args);
    }
}

QuorumPeerConfig.parse

public void parse(String path) throws ConfigException {
    File configFile = new File(path);

    LOG.info("Reading configuration from: " + configFile);

    try {
        if (!configFile.exists()) {
            throw new IllegalArgumentException(configFile.toString()
                    + " file is missing");
        }

        Properties cfg = new Properties();
        FileInputStream in = new FileInputStream(configFile);
        try {
            cfg.load(in);
        } finally {
            in.close();
        }
        // 解析 zoo.cfg 配置文件
        parseProperties(cfg);
    } catch (IOException e) {
        throw new ConfigException("Error processing " + path, e);
    } catch (IllegalArgumentException e) {
        throw new ConfigException("Error processing " + path, e);
    }
}

QuorumPeerConfig.parseProperties

public void parseProperties(Properties zkProp) throws IOException, ConfigException {
	int clientPort = 0;
	String clientPortAddress = null;
	for (Entry<Object, Object> entry : zkProp.entrySet()) {
		String key = entry.getKey().toString().trim();
		String value = entry.getValue().toString().trim();
		if (key.equals("dataDir")) {
			dataDir = value;
		} else if (key.equals("dataLogDir")) {
			dataLogDir = value;
		} else if (key.equals("clientPort")) {
			clientPort = Integer.parseInt(value);
		} else if (key.equals("clientPortAddress")) {
			clientPortAddress = value.trim();
		} else if (key.equals("tickTime")) {
			tickTime = Integer.parseInt(value);
		} else if (key.equals("maxClientCnxns")) {
			maxClientCnxns = Integer.parseInt(value);
		} else if (key.equals("minSessionTimeout")) {
			minSessionTimeout = Integer.parseInt(value);
		} else if (key.equals("maxSessionTimeout")) {
			maxSessionTimeout = Integer.parseInt(value);
		} else if (key.equals("initLimit")) {
			initLimit = Integer.parseInt(value);
		} else if (key.equals("syncLimit")) {
			syncLimit = Integer.parseInt(value);
		} else if (key.equals("electionAlg")) {
			electionAlg = Integer.parseInt(value);
		} else if (key.equals("quorumListenOnAllIPs")) {
			quorumListenOnAllIPs = Boolean.parseBoolean(value);
		} else if (key.equals("peerType")) {
			if (value.toLowerCase().equals("observer")) {
				peerType = LearnerType.OBSERVER;
			} else if (value.toLowerCase().equals("participant")) {
				peerType = LearnerType.PARTICIPANT;
			} else {
				throw new ConfigException("Unrecognised peertype: " + value);
			}
		} else if (key.equals( "syncEnabled" )) {
			syncEnabled = Boolean.parseBoolean(value);
		} else if (key.equals("autopurge.snapRetainCount")) {
			snapRetainCount = Integer.parseInt(value);
		} else if (key.equals("autopurge.purgeInterval")) {
			purgeInterval = Integer.parseInt(value);
		} else if (key.startsWith("server.")) {
			int dot = key.indexOf('.');
			long sid = Long.parseLong(key.substring(dot + 1));
			String parts[] = splitWithLeadingHostname(value);
			if ((parts.length != 2) && (parts.length != 3) && (parts.length !=4)) {
				LOG.error(value + " does not have the form host:port or host:port:port " + " or host:port:port:type");
			}
			LearnerType type = null;
			String hostname = parts[0];
			Integer port = Integer.parseInt(parts[1]);
			Integer electionPort = null;
			if (parts.length > 2){
				electionPort=Integer.parseInt(parts[2]);
			}
			if (parts.length > 3){
				if (parts[3].toLowerCase().equals("observer")) {
					type = LearnerType.OBSERVER;
				} else if (parts[3].toLowerCase().equals("participant")) {
					type = LearnerType.PARTICIPANT;
				} else {
					throw new ConfigException("Unrecognised peertype: " + value);
				}
			}
			if (type == LearnerType.OBSERVER){
				observers.put(Long.valueOf(sid), new QuorumServer(sid, hostname, port, electionPort, type));
			} else {
				servers.put(Long.valueOf(sid), new QuorumServer(sid, hostname, port, electionPort, type));
			}
		} else if (key.startsWith("group")) {
			// ...
		}
	}
	// ...
}

QuorumPeerMain.runFromConfig

从名字可以看出来,是基于配置文件来进行启动。
所以整个方法都是对参数进行解析和设置 , 因为这些参数暂时还没用到,
所以没必要去看。直接看核心的代码
quorumPeer.start(), 启动一个线程,那么从这句代码可以看出来
QuorumPeer 实际是继承了线程。那么它里面一定有一个 run 方法

public void runFromConfig(QuorumPeerConfig config) throws IOException {
      try {
          ManagedUtil.registerLog4jMBeans();
      } catch (JMException e) {
          LOG.warn("Unable to register log4j JMX control", e);
      }
  
      LOG.info("Starting quorum peer");
      try {
          ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
          cnxnFactory.configure(config.getClientPortAddress(),
                                config.getMaxClientCnxns());

          quorumPeer = getQuorumPeer();
          //getView() 会用到
          quorumPeer.setQuorumPeers(config.getServers()); //zoo.cfg里面解析的servers节点
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                  new File(config.getDataLogDir()),
                  new File(config.getDataDir())));
          // 如果没有配置 默认是 3
          quorumPeer.setElectionType(config.getElectionAlg());
          quorumPeer.setMyid(config.getServerId());
          quorumPeer.setTickTime(config.getTickTime());
          quorumPeer.setInitLimit(config.getInitLimit());
          quorumPeer.setSyncLimit(config.getSyncLimit());
          quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
          quorumPeer.setCnxnFactory(cnxnFactory);// 设置cnxnFacotory
          quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
          quorumPeer.setClientPortAddress(config.getClientPortAddress());
          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          quorumPeer.setLearnerType(config.getPeerType());
          quorumPeer.setSyncEnabled(config.getSyncEnabled());

          // sets quorum sasl authentication configurations
          quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
          if(quorumPeer.isQuorumSaslAuthEnabled()){
              quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
              quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
              quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
              quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
              quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
          }

          quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
          quorumPeer.initialize();

          quorumPeer.start();
          quorumPeer.join();
      } catch (InterruptedException e) {
          // warn, but generally this is ok
          LOG.warn("Quorum Peer interrupted", e);
      }
    }

ServerCnxnFactory.createFactory

static public ServerCnxnFactory createFactory() throws IOException {
    String serverCnxnFactoryName =
        System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
    if (serverCnxnFactoryName == null) {
        serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
    }
    try {
        ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
                .getDeclaredConstructor().newInstance();
        LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
        return serverCnxnFactory;
    } catch (Exception e) {
        IOException ioe = new IOException("Couldn't instantiate "
                + serverCnxnFactoryName);
        ioe.initCause(e);
        throw ioe;
    }
}

NIOServerCnxnFactory.configure

public void configure(InetSocketAddress addr, int maxcc) throws IOException {
    configureSaslLogin();

    thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
    thread.setDaemon(true);
    maxClientCnxns = maxcc;
    this.ss = ServerSocketChannel.open();
    ss.socket().setReuseAddress(true);//端口可复用
    LOG.info("binding to port " + addr);
    ss.socket().bind(addr); //绑定ip和端口
    ss.configureBlocking(false); //非阻塞
    ss.register(selector, SelectionKey.OP_ACCEPT); //注册一个accept
}

QuorumPeer.start

QuorumPeer.start 方法,重写了 Thread 的 start。也就是在线程启动之前,会做以下操作

  1. 通过 loadDataBase 恢复快照数据
  2. cnxnFactory.start() 启动 zkServer,相当于用户可以通过 2181 这个端口进行通信了,这块后续在讲。我们还是以 leader 选举为主线
public synchronized void start() {
    loadDataBase(); //加载数据()
    cnxnFactory.start();      //cnxnFacotory?  跟通信有关系. ->暴露一个2181的端口号
    startLeaderElection();  //开始leader选举-> 启动一个投票的监听、初始化一个选举算法FastLeader.
    super.start(); //当前的QuorumPeer继承Thread,调用Thread.start() ->QuorumPeer.run()
}

NIOServerCnxnFactory.run

@Override
public void start() {
    // ensure thread is started once and only once
    // thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
    if (thread.getState() == Thread.State.NEW) {
        thread.start();
    }
}

// run 方法
//当收到客户端的请求时,会需要从这个方法里面来看-> create/delete/setdata
public void run() {
    while (!ss.socket().isClosed()) {
        try {
            selector.select(1000);
            Set<SelectionKey> selected;
            synchronized (this) {
                selected = selector.selectedKeys();
            }
            ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
            Collections.shuffle(selectedList);
            for (SelectionKey k : selectedList) {
                if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                    SocketChannel sc = ((ServerSocketChannel) k
                            .channel()).accept();
                    InetAddress ia = sc.socket().getInetAddress();
                    int cnxncount = getClientCnxnCount(ia);
                    if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
                        LOG.warn("Too many connections from " + ia
                                 + " - max is " + maxClientCnxns );
                        sc.close();
                    } else {
                        LOG.info("Accepted socket connection from "
                                 + sc.socket().getRemoteSocketAddress());
                        sc.configureBlocking(false);
                        SelectionKey sk = sc.register(selector,
                                SelectionKey.OP_READ);
                        NIOServerCnxn cnxn = createConnection(sc, sk);
                        sk.attach(cnxn);
                        addCnxn(cnxn);
                    }
                } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                    NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                    c.doIO(k);
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Unexpected ops in select "
                                  + k.readyOps());
                    }
                }
            }
            selected.clear();
        } catch (RuntimeException e) {
            LOG.warn("Ignoring unexpected runtime exception", e);
        } catch (Exception e) {
            LOG.warn("Ignoring exception", e);
        }
    }
    closeAll();
    LOG.info("NIOServerCnxn factory exited run method");
}

QuorumPeer.startLeaderElection

看到这个方法,有没有两眼放光的感觉?没错,前面铺垫了这么长,终于进入 leader 选举的方法了

synchronized public void startLeaderElection() {
    try {
        //构建一个票据, (myid ,zxid ,epoch),用来投票的。
        currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
    } catch(IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
    }
    //TODO  先留一个问题->
    for (QuorumServer p : getView().values()) {
        if (p.id == myid) {
            myQuorumAddr = p.addr; //地址: 1 ->myQuorumAddr=192.168.13.102
            break;
        }
    }
    if (myQuorumAddr == null) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
    }
    if (electionType == 0) { //选举的策略
        try {
            udpSocket = new DatagramSocket(myQuorumAddr.getPort());
            responder = new ResponderThread();
            responder.start();
        } catch (SocketException e) {
            throw new RuntimeException(e);
        }
    }
    //
    this.electionAlg = createElectionAlgorithm(electionType);
}

QuorumPeer.createElectionAlgorithm

根据对应的标识创建选举算法

protected Election createElectionAlgorithm(int electionAlgorithm){
    Election le=null;
            
    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
    case 0:
        le = new LeaderElection(this);
        break;
    case 1:
        le = new AuthFastLeaderElection(this);
        break;
    case 2:
        le = new AuthFastLeaderElection(this, true);
        break;
    case 3:
        //跟选举有关系,用来接收投票的。
        qcm = createCnxnManager();
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener != null){
            listener.start();
            //创建一个FastLeaderElection选举算法
            le = new FastLeaderElection(this, qcm);
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
    return le;
}

QuorumCnxManager.createCnxnManager

public QuorumCnxManager(final long mySid,
                        Map<Long,QuorumPeer.QuorumServer> view,
                        QuorumAuthServer authServer,
                        QuorumAuthLearner authLearner,
                        int socketTimeout,
                        boolean listenOnAllIPs,
                        int quorumCnxnThreadsSize,
                        boolean quorumSaslAuthEnabled,
                        ConcurrentHashMap<Long, SendWorker> senderWorkerMap) {
    this.senderWorkerMap = senderWorkerMap;
    // 接收消息的队列
    this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
    // 发送消息的队列
    this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
    this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
    String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
    if(cnxToValue != null){
        this.cnxTO = Integer.parseInt(cnxToValue);
    }

    this.mySid = mySid;
    this.socketTimeout = socketTimeout;
    this.view = view;
    this.listenOnAllIPs = listenOnAllIPs;

    initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
            quorumSaslAuthEnabled);

    // Starts listener thread that waits for connection requests 
    listener = new Listener();
}

QuorumCnxManager.Listener.run

// listener 是一个线程,实际调用里面的 run 方法
@Override
public void run() {
    int numRetries = 0;
    InetSocketAddress addr;
    while((!shutdown) && (numRetries < 3)){
        try {
            ss = new ServerSocket(); //不是NIO, socket io
            ss.setReuseAddress(true);
            if (listenOnAllIPs) {
                int port = view.get(QuorumCnxManager.this.mySid)
                    .electionAddr.getPort();
                addr = new InetSocketAddress(port);
            } else {
                addr = view.get(QuorumCnxManager.this.mySid).electionAddr;
            }
            LOG.info("My election bind port: " + addr.toString());
            setName(view.get(QuorumCnxManager.this.mySid).electionAddr.toString());
            ss.bind(addr);
            while (!shutdown) {
                Socket client = ss.accept();
                setSockOpts(client);
                LOG.info("Received connection request "
                        + client.getRemoteSocketAddress());

                // Receive and handle the connection request
                // asynchronously if the quorum sasl authentication is
                // enabled. This is required because sasl server
                // authentication process may take few seconds to finish,
                // this may delay next peer connection requests.
                if (quorumSaslAuthEnabled) {
                    receiveConnectionAsync(client);
                } else {
                    receiveConnection(client);
                }

                numRetries = 0;
            }
        } catch (IOException e) {
            LOG.error("Exception while listening", e);
            numRetries++;
            try {
                ss.close();
                Thread.sleep(1000);
            } catch (IOException ie) {
                LOG.error("Error closing server socket", ie);
            } catch (InterruptedException ie) {
                LOG.error("Interrupted while sleeping. " +
                          "Ignoring exception", ie);
            }
        }
    }
    LOG.info("Leaving listener");
    if (!shutdown) {
        LOG.error("As I'm leaving the listener thread, "
                + "I won't be able to participate in leader "
                + "election any longer: "
                + view.get(QuorumCnxManager.this.mySid).electionAddr);
    }
}

QuorumCnxManager.receiveConnection

public void receiveConnection(final Socket sock) {
    DataInputStream din = null;
    try {
        din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
        handleConnection(sock, din);
    } catch (IOException e) {
        LOG.error("Exception handling connection, addr: {}, closing server connection",
                 sock.getRemoteSocketAddress());
        closeSocket(sock);
    }
}

QuorumCnxManager.handleConnection

private void handleConnection(Socket sock, DataInputStream din) throws IOException {
    Long sid = null;
    try {
        // Read server id
        sid = din.readLong();
        if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
            sid = din.readLong();
            // next comes the #bytes in the remainder of the message
            // note that 0 bytes is fine (old servers)
            int num_remaining_bytes = din.readInt();
            if (num_remaining_bytes < 0 || num_remaining_bytes > maxBuffer) {
                LOG.error("Unreasonable buffer length: {}", num_remaining_bytes);
                closeSocket(sock);
                return;
            }
            byte[] b = new byte[num_remaining_bytes];

            // remove the remainder of the message from din
            int num_read = din.read(b);
            if (num_read != num_remaining_bytes) {
                LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
            }
        }
        if (sid == QuorumPeer.OBSERVER_ID) {
            /*
             * Choose identifier at random. We need a value to identify
             * the connection.
             */
            sid = observerCounter.getAndDecrement();
            LOG.info("Setting arbitrary identifier to observer: " + sid);
        }
    } catch (IOException e) {
        closeSocket(sock);
        LOG.warn("Exception reading or writing challenge: " + e.toString());
        return;
    }

    // do authenticating learner
    LOG.debug("Authenticating learner server.id: {}", sid);
    // 权限认证
    authServer.authenticate(sock, din);

    //If wins the challenge, then close the new connection.
    // 只能 sid 大的连接小的,否则要反过来
    if (sid < this.mySid) {
        /*
         * This replica might still believe that the connection to sid is
         * up, so we have to shut down the workers before trying to open a
         * new connection.
         */
        SendWorker sw = senderWorkerMap.get(sid);
        if (sw != null) {
            sw.finish();
        }

        /*
         * Now we start a new connection
         */
        LOG.debug("Create new connection to server: " + sid);
        closeSocket(sock);
        // 链接到 sid 对应服务器
        connectOne(sid);

        // Otherwise start worker threads to receive data.
    } else {
        SendWorker sw = new SendWorker(sock, sid);
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);

        SendWorker vsw = senderWorkerMap.get(sid);
        
        if(vsw != null)
            vsw.finish();
        
        senderWorkerMap.put(sid, sw);
        queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
        
        sw.start();
        rw.start();
        
        return;
    }
}

SendWorker.run

@Override
public void run() {
    threadCnt.incrementAndGet();
    try {
        /**
         * If there is nothing in the queue to send, then we
         * send the lastMessage to ensure that the last message
         * was received by the peer. The message could be dropped
         * in case self or the peer shutdown their connection
         * (and exit the thread) prior to reading/processing
         * the last message. Duplicate messages are handled correctly
         * by the peer.
         *
         * If the send queue is non-empty, then we have a recent
         * message than that stored in lastMessage. To avoid sending
         * stale message, we should send the message in the send queue.
         */
        ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
        if (bq == null || isSendQueueEmpty(bq)) {
           ByteBuffer b = lastMessageSent.get(sid);
           if (b != null) {
               LOG.debug("Attempting to send lastMessage to sid=" + sid);
               send(b);
           }
        }
    } catch (IOException e) {
        LOG.error("Failed to send last message. Shutting down thread.", e);
        this.finish();
    }
    
    try {
        while (running && !shutdown && sock != null) {

            ByteBuffer b = null;
            try {
                ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                if (bq != null) {
                    // 从队列中获取
                    b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
                } else {
                    LOG.error("No queue of incoming messages for " +
                              "server " + sid);
                    break;
                }

                if(b != null){
                    lastMessageSent.put(sid, b);
                    // 发送
                    send(b);
                }
            } catch (InterruptedException e) {
                LOG.warn("Interrupted while waiting for message on queue",
                        e);
            }
        }
    } catch (Exception e) {
        LOG.warn("Exception when using channel: for id " + sid
                 + " my id = " + QuorumCnxManager.this.mySid
                 + " error = " + e);
    }
    this.finish();
    LOG.warn("Send worker leaving thread");
}

SendWorker.send

synchronized void send(ByteBuffer b) throws IOException {
    byte[] msgBytes = new byte[b.capacity()];
    try {
        b.position(0);
        b.get(msgBytes);
    } catch (BufferUnderflowException be) {
        LOG.error("BufferUnderflowException ", be);
        return;
    }
    dout.writeInt(b.capacity());
    dout.write(b.array());
    dout.flush();
}

RecvWorker.run

@Override
public void run() {
    threadCnt.incrementAndGet();
    try {
        while (running && !shutdown && sock != null) {
            /**
             * Reads the first int to determine the length of the
             * message
             */
            int length = din.readInt();
            if (length <= 0 || length > PACKETMAXSIZE) {
                throw new IOException(
                        "Received packet with invalid packet: " + length);
            }
            /**
             * Allocates a new ByteBuffer to receive the message
             */
            byte[] msgArray = new byte[length];
            din.readFully(msgArray, 0, length);
            ByteBuffer message = ByteBuffer.wrap(msgArray);
            addToRecvQueue(new Message(message.duplicate(), sid));
        }
    } catch (Exception e) {
        LOG.warn("Connection broken for id " + sid + ", my id = "
                 + QuorumCnxManager.this.mySid + ", error = " , e);
    } finally {
        LOG.warn("Interrupting SendWorker");
        sw.finish();
        if (sock != null) {
            closeSocket(sock);
        }
    }
}

RecvWorker.addToRecvQueue

public void addToRecvQueue(Message msg) {
    synchronized(recvQLock) {
        if (recvQueue.remainingCapacity() == 0) {
            try {
                recvQueue.remove();
            } catch (NoSuchElementException ne) {
                // element could be removed by poll()
                 LOG.debug("Trying to remove from an empty " +
                     "recvQueue. Ignoring exception " + ne);
            }
        }
        try {
            recvQueue.add(msg);
        } catch (IllegalStateException ie) {
            // This should never happen
            LOG.error("Unable to insert element in the recvQueue " + ie);
        }
    }
}

FastLeaderElection

初始化FastLeaderElection,QuorumCnxManager 是一个很核心的对象,用来实现领导选举中的网络连接管理功能,这个后面会用到

public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
    this.stop = false;
    this.manager = manager;
    starter(self, manager);
}

FastLeaderElection.starter

starter 方法里面,设置了一些成员属性,并且构建了两个阻塞队列,分别是 sendQueue 和 recvqueue。并且实例化了一个 Messager

private void starter(QuorumPeer self, QuorumCnxManager manager) {
    this.self = self;
    proposedLeader = -1;
    proposedZxid = -1;

    sendqueue = new LinkedBlockingQueue<ToSend>();
    recvqueue = new LinkedBlockingQueue<Notification>();
    this.messenger = new Messenger(manager);
}

Messenger

在 Messenger 里面构建了两个线程,一个是 WorkerSender,一个是WorkerReceiver。 这两个线程是分别用来发送和接收消息的线程。具体做什么,暂时先不分析。

Messenger(QuorumCnxManager manager) {

    this.ws = new WorkerSender(manager); //发送票据的线程(用于消费sendQueue)

    Thread t = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");
    t.setDaemon(true); //守护线程
    t.start();

    this.wr = new WorkerReceiver(manager);//接收票据的线程

    t = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
    t.setDaemon(true);
    t.start();
}

QuorumPeer.run

@Override
public void run() {
    setName("QuorumPeer" + "[myid=" + getId() + "]" +
            cnxnFactory.getLocalAddress());

    LOG.debug("Starting quorum peer");
    try {
        jmxQuorumBean = new QuorumBean(this);
        MBeanRegistry.getInstance().register(jmxQuorumBean, null);
        for(QuorumServer s: getView().values()){
            ZKMBeanInfo p;
            if (getId() == s.id) {
                p = jmxLocalPeerBean = new LocalPeerBean(this);
                try {
                    MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                } catch (Exception e) {
                    LOG.warn("Failed to register with JMX", e);
                    jmxLocalPeerBean = null;
                }
            } else {
                p = new RemotePeerBean(s);
                try {
                    MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                } catch (Exception e) {
                    LOG.warn("Failed to register with JMX", e);
                }
            }
        }
    } catch (Exception e) {
        LOG.warn("Failed to register with JMX", e);
        jmxQuorumBean = null;
    }

    try {
        /*
         * Main loop
         * 死循环
         */
        while (running) {
            switch (getPeerState()) {//第一次启动的时候,LOOKING
            case LOOKING:
                LOG.info("LOOKING");

                if (Boolean.getBoolean("readonlymode.enabled")) {
                    LOG.info("Attempting to start ReadOnlyZooKeeperServer");

                    // Create read-only server but don't start it immediately
                    final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
                            logFactory, this,
                            new ZooKeeperServer.BasicDataTreeBuilder(),
                            this.zkDb);

                    // Instead of starting roZk immediately, wait some grace
                    // period before we decide we're partitioned.
                    //
                    // Thread is used here because otherwise it would require
                    // changes in each of election strategy classes which is
                    // unnecessary code coupling.
                    Thread roZkMgr = new Thread() {
                        public void run() {
                            try {
                                // lower-bound grace period to 2 secs
                                sleep(Math.max(2000, tickTime));
                                if (ServerState.LOOKING.equals(getPeerState())) {
                                    roZk.startup();
                                }
                            } catch (InterruptedException e) {
                                LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                            } catch (Exception e) {
                                LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                            }
                        }
                    };
                    try {
                        roZkMgr.start();
                        setBCVote(null);
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                        setPeerState(ServerState.LOOKING);
                    } finally {
                        // If the thread is in the the grace period, interrupt
                        // to come out of waiting.
                        roZkMgr.interrupt();
                        roZk.shutdown();
                    }
                } else {
                    try {
                        setBCVote(null);
                        //setCurrentVote -> 确定了谁是leader了。
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                        setPeerState(ServerState.LOOKING);
                    }
                }
                break;
            case OBSERVING:
                try {
                    LOG.info("OBSERVING");
                    setObserver(makeObserver(logFactory));
                    observer.observeLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e );                        
                } finally {
                    observer.shutdown();
                    setObserver(null);
                    setPeerState(ServerState.LOOKING);
                }
                break;
            case FOLLOWING:
                try {
                    LOG.info("FOLLOWING");
                    setFollower(makeFollower(logFactory));
                    follower.followLeader(); //连接到leader
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    follower.shutdown();
                    setFollower(null);
                    setPeerState(ServerState.LOOKING);
                }
                break;
            case LEADING:
                LOG.info("LEADING");
                try {
                    setLeader(makeLeader(logFactory));
                    leader.lead(); //lead 状态
                    setLeader(null);
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    if (leader != null) {
                        leader.shutdown("Forcing shutdown");
                        setLeader(null);
                    }
                    setPeerState(ServerState.LOOKING);
                }
                break;
            }
        }
    } finally {
        LOG.warn("QuorumPeer main thread exited");
        try {
            MBeanRegistry.getInstance().unregisterAll();
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        jmxQuorumBean = null;
        jmxLocalPeerBean = null;
    }
}

FastLeaderElection.lookForLeader

public Vote lookForLeader() throws InterruptedException {
    try {
        self.jmxLeaderElectionBean = new LeaderElectionBean();
        MBeanRegistry.getInstance().register(
                self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
    } catch (Exception e) {
        LOG.warn("Failed to register with JMX", e);
        self.jmxLeaderElectionBean = null;
    }
    if (self.start_fle == 0) {
       self.start_fle = Time.currentElapsedTime();
    }
    try {
        //接收到的票据的集合
        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

        //
        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

        int notTimeout = finalizeWait;

        synchronized(this){
            //逻辑时钟->epoch
            logicalclock.incrementAndGet();
            //proposal
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }

        LOG.info("New election. My id =  " + self.getId() +
                ", proposed zxid=0x" + Long.toHexString(proposedZxid));
        sendNotifications();//我要广播自己的票据

        /*
         * Loop in which we exchange notifications until we find a leader
         */

        //接收到了票据
        while ((self.getPeerState() == ServerState.LOOKING) &&
                (!stop)){
            /*
             * Remove next notification from queue, times out after 2 times
             * the termination time
             */
            //recvqueue是从网络上接收到的其他机器的Notification
            Notification n = recvqueue.poll(notTimeout,
                    TimeUnit.MILLISECONDS);

            /*
             * Sends more notifications if haven't received enough.
             * Otherwise processes new notification.
             */
            if(n == null){
                if(manager.haveDelivered()){
                    sendNotifications();
                } else {
                    manager.connectAll();//重新连接集群中的所有节点
                }

                /*
                 * Exponential backoff
                 */
                int tmpTimeOut = notTimeout*2;
                notTimeout = (tmpTimeOut < maxNotificationInterval?
                        tmpTimeOut : maxNotificationInterval);
                LOG.info("Notification time out: " + notTimeout);
            }

            else if(validVoter(n.sid) && validVoter(n.leader)) {//判断是否是一个有效的票据
                /*
                 * Only proceed if the vote comes from a replica in the
                 * voting view for a replica in the voting view.
                 */
                switch (n.state) {
                case LOOKING: //第一次进入到这个case
                    // If notification > current, replace and send messages out
                    if (n.electionEpoch > logicalclock.get()) { //
                        logicalclock.set(n.electionEpoch);
                        recvset.clear();//清空
                        //收到票据之后,当前的server要听谁的。
                        //可能是听server1的、也可能是听server2,也可能是听server3
                        //zab  leader选举算法
                        if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            //把自己的票据更新成对方的票据,那么下一次,发送的票据就是新的票据
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            //收到的票据小于当前的节点的票据,下一次发送票据,仍然发送自己的
                            updateProposal(getInitId(),
                                    getInitLastLoggedZxid(),
                                    getPeerEpoch());
                        }
                        //继续发送通知
                        sendNotifications();
                    } else if (n.electionEpoch < logicalclock.get()) { //说明当前的数据已经过期了
                        if(LOG.isDebugEnabled()){
                            LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch)
                                    + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                        }
                        break;
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                            proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }

                    if(LOG.isDebugEnabled()){
                        LOG.debug("Adding vote: from=" + n.sid +
                                ", proposed leader=" + n.leader +
                                ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                    }

                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                    //决断时刻(当前节点的更新后的vote信息,和recvset集合中的票据进行归纳,)
                    if (termPredicate(recvset,
                            new Vote(proposedLeader, proposedZxid,
                                    logicalclock.get(), proposedEpoch))) {

                        // Verify if there is any change in the proposed leader
                        while((n = recvqueue.poll(finalizeWait,
                                TimeUnit.MILLISECONDS)) != null){
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    proposedLeader, proposedZxid, proposedEpoch)){
                                recvqueue.put(n);
                                break;
                            }
                        }

                        /*
                         * This predicate is true once we don't read any new
                         * relevant message from the reception queue
                         */
                        if (n == null) {
                            self.setPeerState((proposedLeader == self.getId()) ?
                                    ServerState.LEADING: learningState());

                            Vote endVote = new Vote(proposedLeader,
                                                    proposedZxid,
                                                    logicalclock.get(),
                                                    proposedEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
                case OBSERVING:
                    LOG.debug("Notification from observer: " + n.sid);
                    break;
                case FOLLOWING:
                case LEADING:
                    /*
                     * Consider all notifications from the same epoch
                     * together.
                     */
                    if(n.electionEpoch == logicalclock.get()){
                        recvset.put(n.sid, new Vote(n.leader,
                                                      n.zxid,
                                                      n.electionEpoch,
                                                      n.peerEpoch));
                       
                        if(ooePredicate(recvset, outofelection, n)) {
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING: learningState());

                            Vote endVote = new Vote(n.leader, 
                                    n.zxid, 
                                    n.electionEpoch, 
                                    n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }

                    /*
                     * Before joining an established ensemble, verify
                     * a majority is following the same leader.
                     */
                    outofelection.put(n.sid, new Vote(n.version,
                                                        n.leader,
                                                        n.zxid,
                                                        n.electionEpoch,
                                                        n.peerEpoch,
                                                        n.state));
       
                    if(ooePredicate(outofelection, outofelection, n)) {
                        synchronized(this){
                            logicalclock.set(n.electionEpoch);
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING: learningState());
                        }
                        Vote endVote = new Vote(n.leader,
                                                n.zxid,
                                                n.electionEpoch,
                                                n.peerEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }
                    break;
                default:
                    LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
                            n.state, n.sid);
                    break;
                }
            } else {
                if (!validVoter(n.leader)) {
                    LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                }
                if (!validVoter(n.sid)) {
                    LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                }
            }
        }
        return null;
    } finally {
        try {
            if(self.jmxLeaderElectionBean != null){
                MBeanRegistry.getInstance().unregister(
                        self.jmxLeaderElectionBean);
            }
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        self.jmxLeaderElectionBean = null;
        LOG.debug("Number of connection processing threads: {}",
                manager.getConnectionThreadCount());
    }
}

FastLeaderElection.updateProposal

synchronized void updateProposal(long leader, long zxid, long epoch){
    if(LOG.isDebugEnabled()){
        LOG.debug("Updating proposal: " + leader + " (newleader), 0x"
                + Long.toHexString(zxid) + " (newzxid), " + proposedLeader
                + " (oldleader), 0x" + Long.toHexString(proposedZxid) + " (oldzxid)");
    }
    proposedLeader = leader;
    proposedZxid = zxid;
    proposedEpoch = epoch;
}

FastLeaderElection.sendNotifications

private void sendNotifications() {
    for (QuorumServer server : self.getVotingView().values()) {
        long sid = server.id;

        ToSend notmsg = new ToSend(ToSend.mType.notification,
                proposedLeader, //myid
                proposedZxid, //zxid
                logicalclock.get(),//epoch
                QuorumPeer.ServerState.LOOKING,//
                sid, //myid
                proposedEpoch); //发起票据epoch
        if(LOG.isDebugEnabled()){
            LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                  Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get())  +
                  " (n.round), " + sid + " (recipient), " + self.getId() +
                  " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
        }
        sendqueue.offer(notmsg); //阻塞队列,  线程->生产者消费者模式
    }
}

FastLeaderElection.totalOrderPredicate

protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
    LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
            Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
    if(self.getQuorumVerifier().getWeight(newId) == 0){
        return false;
    }
    
    /*
     * We return true if one of the following three cases hold:
     * 1- New epoch is higher
     * 2- New epoch is the same as current epoch, but new zxid is higher
     * 3- New epoch is the same as current epoch, new zxid is the same
     *  as current zxid, but server id is higher.
     */
    return ((newEpoch > curEpoch) || 
            ((newEpoch == curEpoch) &&
            ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}

FastLeaderElection.termPredicate

protected boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {

    HashSet<Long> set = new HashSet<Long>();

    /*
     * First make the views consistent. Sometimes peers will have
     * different zxids for a server depending on timing.
     *
     */
    for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
        if (vote.equals(entry.getValue())){ //对票据进行归纳
            set.add(entry.getKey()); //如果存在2票,set里面是不是有2个?
        }
    }

    return self.getQuorumVerifier().containsQuorum(set); //验证
}

QuorumMaj.containsQuorum

public boolean containsQuorum(Set<Long> set){
    return (set.size() > half); //已经归纳的票据是否大于half .2>1  -> leader选举、 数据同步
}

QuorumPeer.makeObserver

protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
    return new Observer(this, new ObserverZooKeeperServer(logFactory,
            this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
}

Observer.observeLeader

void observeLeader() throws InterruptedException {
    zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);

    try {
        QuorumServer leaderServer = findLeader();
        LOG.info("Observing " + leaderServer.addr);
        try {
            connectToLeader(leaderServer.addr, leaderServer.hostname);
            long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);

            syncWithLeader(newLeaderZxid);
            QuorumPacket qp = new QuorumPacket();
            while (this.isRunning()) {
                readPacket(qp);
                processPacket(qp);                   
            }
        } catch (Exception e) {
            LOG.warn("Exception when observing the leader", e);
            try {
                sock.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }

            // clear pending revalidations
            pendingRevalidations.clear();
        }
    } finally {
        zk.unregisterJMX(this);
    }
}

FastLeaderElection.makeFollower

protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
    return new Follower(this, new FollowerZooKeeperServer(logFactory, 
            this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
}

Follower.followLeader

void followLeader() throws InterruptedException {
    self.end_fle = Time.currentElapsedTime();
    long electionTimeTaken = self.end_fle - self.start_fle;
    self.setElectionTimeTaken(electionTimeTaken);
    LOG.info("FOLLOWING - LEADER ELECTION TOOK - {}", electionTimeTaken);
    self.start_fle = 0;
    self.end_fle = 0;
    fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
    try {
        QuorumServer leaderServer = findLeader();            
        try {
            //follower要去连接到leader
            connectToLeader(leaderServer.addr, leaderServer.hostname);
            long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

            //check to see if the leader zxid is lower than ours
            //this should never happen but is just a safety check
            long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
            if (newEpoch < self.getAcceptedEpoch()) {
                LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                        + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                throw new IOException("Error: Epoch of leader is lower");
            }
            syncWithLeader(newEpochZxid);                
            QuorumPacket qp = new QuorumPacket();
            while (this.isRunning()) {
                readPacket(qp);
                processPacket(qp);
            }
        } catch (Exception e) {
            LOG.warn("Exception when following the leader", e);
            try {
                sock.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }

            // clear pending revalidations
            pendingRevalidations.clear();
        }
    } finally {
        zk.unregisterJMX((Learner)this);
    }
}

FastLeaderElection.makeLeader

protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
    return new Leader(this, new LeaderZooKeeperServer(logFactory,
            this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
}

Leader.lead

void lead() throws IOException, InterruptedException {
    self.end_fle = Time.currentElapsedTime();
    long electionTimeTaken = self.end_fle - self.start_fle;
    self.setElectionTimeTaken(electionTimeTaken);
    LOG.info("LEADING - LEADER ELECTION TOOK - {}", electionTimeTaken);
    self.start_fle = 0;
    self.end_fle = 0;

    zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);

    try {
        self.tick.set(0);
        zk.loadData();
        
        leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

        // Start thread that waits for connection requests from 
        // new followers.
        cnxAcceptor = new LearnerCnxAcceptor();
        cnxAcceptor.start();
        
        readyToStart = true;
        long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
        
        zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
        
        synchronized(this){
            lastProposed = zk.getZxid();
        }
        
        newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null);

        if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
            LOG.info("NEWLEADER proposal has Zxid of "
                    + Long.toHexString(newLeaderProposal.packet.getZxid()));
        }
        
        waitForEpochAck(self.getId(), leaderStateSummary);
        self.setCurrentEpoch(epoch);

        // We have to get at least a majority of servers in sync with
        // us. We do this by waiting for the NEWLEADER packet to get
        // acknowledged
        try {
            waitForNewLeaderAck(self.getId(), zk.getZxid());
        } catch (InterruptedException e) {
            shutdown("Waiting for a quorum of followers, only synced with sids: [ "
                    + getSidSetString(newLeaderProposal.ackSet) + " ]");
            HashSet<Long> followerSet = new HashSet<Long>();
            for (LearnerHandler f : learners)
                followerSet.add(f.getSid());
                
            if (self.getQuorumVerifier().containsQuorum(followerSet)) {
                LOG.warn("Enough followers present. "
                        + "Perhaps the initTicks need to be increased.");
            }
            Thread.sleep(self.tickTime);
            self.tick.incrementAndGet();
            return;
        }
        
        startZkServer();
        
        /**
         * WARNING: do not use this for anything other than QA testing
         * on a real cluster. Specifically to enable verification that quorum
         * can handle the lower 32bit roll-over issue identified in
         * ZOOKEEPER-1277. Without this option it would take a very long
         * time (on order of a month say) to see the 4 billion writes
         * necessary to cause the roll-over to occur.
         * 
         * This field allows you to override the zxid of the server. Typically
         * you'll want to set it to something like 0xfffffff0 and then
         * start the quorum, run some operations and see the re-election.
         */
        String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
        if (initialZxid != null) {
            long zxid = Long.parseLong(initialZxid);
            zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
        }
        
        if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
            self.cnxnFactory.setZooKeeperServer(zk);
        }
        // Everything is a go, simply start counting the ticks
        // WARNING: I couldn't find any wait statement on a synchronized
        // block that would be notified by this notifyAll() call, so
        // I commented it out
        //synchronized (this) {
        //    notifyAll();
        //}
        // We ping twice a tick, so we only update the tick every other
        // iteration
        boolean tickSkip = true;

        while (true) {
            Thread.sleep(self.tickTime / 2);
            if (!tickSkip) {
                self.tick.incrementAndGet();
            }
            HashSet<Long> syncedSet = new HashSet<Long>();

            // lock on the followers when we use it.
            syncedSet.add(self.getId());

            for (LearnerHandler f : getLearners()) {
                // Synced set is used to check we have a supporting quorum, so only
                // PARTICIPANT, not OBSERVER, learners should be used
                if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
                    syncedSet.add(f.getSid());
                }
                f.ping();
            }

            // check leader running status
            if (!this.isRunning()) {
                shutdown("Unexpected internal error");
                return;
            }

          if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
            //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
                // Lost quorum, shutdown
                shutdown("Not sufficient followers synced, only synced with sids: [ "
                        + getSidSetString(syncedSet) + " ]");
                // make sure the order is the same!
                // the leader goes to looking
                return;
          } 
          tickSkip = !tickSkip;
        }
    } finally {
        zk.unregisterJMX(this);
    }
}
上次编辑于:
贡献者: soulballad