zookeeper中监听原理源码
约 4015 字大约 13 分钟
目录
- zookeeper 初始化,并调用 exists方法
- [ZooKeeper](#new ZooKeeper) zookeeper初始化
- [ClientCnxn](#new ClientCnxn) 初始化客户端 cnxn
- getClientCnxnSocket 初始化 nioClientSocket
- cnxn.start 调用 ClientCnxn start方法
- sendThread.start 启动 sendThread 线程
- sendPing 心跳相关
- queuePacket 把数据包添加到队列
- clientCnxnSocket.doTransport
- clientCnxnSocket.doTransport
- doIO nioClientSocket 读写操作,现在要先看写操作
- p.createBB 创建需要传递数据内容
- [sock.write] SocketChannel.write 发送消息
- doIO nioClientSocket 读写操作,现在要先看写操作
- clientCnxnSocket.doTransport
- sendPing 心跳相关
- eventThread.start 启动 eventThread 线程
- processEvent 阻塞队列中消息处理
- [watcher.process] 先留一个疑问???
- processEvent 阻塞队列中消息处理
- sendThread.start 启动 sendThread 线程
- zooKeeper.exists
- 服务端注册监听:
接下来要看之前的 cnxnFactory
- cnxnFactory.start 启动 cnxnFactory 线程,NIO监听2181端口
- c.doIO(k) 执行IO操作
- readPayload 执行读操作,客户端是写操作
- readRequest 读取请求数据
- zkServer.processPacket 处理数据包
- submitRequest 提交处理请求,这里有个 firstProcessor,看它在哪里初始化:setupRequestProcessors 初始化责任链:PrepRequestProcessor->SyncRequestProcessor->FinalRequestProcessor
setupRequestProcessors 这个方法是初始化责任链,当前是在ZooKeeperServer中调用,同时也有几种不同的实现 followerChain,leaderChain,observerChain- firstProcessor.processRequest 责任链处理流程,加入阻塞队列
- PrepRequestProcessor.run 消费阻塞队列
- pRequest 线程中处理请求
- nextProcessor.processRequest 调用下一个processor处理,加入阻塞队列
- SyncRequestProcessor.run 调用下一个processor处理,加入阻塞队列
- nextProcessor.processRequest 最后一个处理节点
- zks.getZKDatabase().statNode 服务端保存数据
- dataTree.statNode 服务端保存数据
- dataWatches.addWatch 把watch保存到hashmap中,这里实际保存的是cnxn
- dataTree.statNode 服务端保存数据
- cnxn.sendResponse 发送结果到客户端
- zks.getZKDatabase().statNode 服务端保存数据
- nextProcessor.processRequest 最后一个处理节点
- pRequest 线程中处理请求
- submitRequest 提交处理请求,这里有个 firstProcessor,看它在哪里初始化:setupRequestProcessors 初始化责任链:PrepRequestProcessor->SyncRequestProcessor->FinalRequestProcessor
- zkServer.processPacket 处理数据包
- readRequest 读取请求数据
- readPayload 执行读操作,客户端是写操作
- c.doIO(k) 执行IO操作
- 客户端注册监听:
服务端发送返回结果后,客户端需要读取;前面的“ClientCnxnSocketNIO.doIO”中,只看了“isWritable”,实际上还有“isReadable”部分
- doIO nioClientSocket 读写操作,这里看读操作
- sendThread.readResponse 读取服务端响应信息
- finishPacket 注册本地事件
p.watchRegistration.register 注册本地事件监听
- getWatches 获取已注册的 watchers
eventThread.queuePacket 添加到阻塞队列
- finishPacket 注册本地事件
- sendThread.readResponse 读取服务端响应信息
- 如何触发监听事件?·
在FinalRequestProcessor中如果接收到一个事务请求 setData/delete/create,会执行下面的逻辑:
- nextProcessor.processRequest 处理事务请求
- zks.processTxn 处理事务请求
- getZKDatabase().processTxn(hdr, txn) 处理事务请求
- dataTree.processTxn 处理事务请求
- setData 以setData为例
- dataWatches.triggerWatch 触发watcher事件
- w.process 返回给客户端
那么如何在客户端接收响应并处理呢?还要看“ClientCnxnSocketNIO.doIO”中的“isReadable”
- w.process 返回给客户端
- dataWatches.triggerWatch 触发watcher事件
- setData 以setData为例
- dataTree.processTxn 处理事务请求
- getZKDatabase().processTxn(hdr, txn) 处理事务请求
- doIO nioClientSocket 读写操作,这里看读操作,服务端返回xid=-1,zxid=-1
- eventThread.queueEvent 加入阻塞事件队列,EventThread线程中有个run方法消费队列
- eventThread.run 消费阻塞队列
- processEvent 处理方法
- [watcher.process --> TestWatcher.process] 调用自定义 watcher
原理:客户端保存有已注册的事件,服务端触发事件后发送通知给客户端,客户端根据path在本地事件map中找到对应的watcher,然后调用process方法
- [watcher.process --> TestWatcher.process] 调用自定义 watcher
- processEvent 处理方法
- eventThread.run 消费阻塞队列
- eventThread.queueEvent 加入阻塞事件队列,EventThread线程中有个run方法消费队列
- zks.processTxn 处理事务请求
new ZooKeeper
在创建一个 ZooKeeper 客户端对象实例时,我们通过 new Watcher() 向构造方法中传入一个默认的 Watcher,这个 Watcher 将作为整个 ZooKeeper 会话期间的默认 Watcher,会一直被保存在客户端 ZKWatchManager 的 defaultWatcher 中;代码如下:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly) throws IOException {
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
// 通过构造函数传入 watcher
watchManager.defaultWatcher = watcher;
// connectString -> zooKeeper url地址
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
// 客户端网络操作工具
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
new ClientCnxn
ClientCnxn:是 Zookeeper 客户端和 Zookeeper 服务器端进行通信和事件通知处理的主要类,它内部包含两个类,
- SendThread :负责客户端和服务器端的数据通信, 也包括事件信息的传输
- EventThread : 主要在客户端回调注册的 Watchers 进行通知处理
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
}
ZooKeeper.getClientCnxnSocket
private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
String clientCnxnSocketName = System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
// 默认使用 NIO
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
try {
return (ClientCnxnSocket) Class.forName(clientCnxnSocketName).getDeclaredConstructor().newInstance();
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate " + clientCnxnSocketName);
ioe.initCause(e);
throw ioe;
}
}
ClientCnxn.start
public void start() {
sendThread.start();
eventThread.start();
}
SendThread.run
@Override
public void run() {
clientCnxnSocket.introduce(this,sessionId);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
while (state.isAlive()) {
try {
// ...
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
// ...
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
} catch (Throwable e) {
// ...
}
}
cleanup();
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null)); }
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), "SendThread exited loop for session: 0x"+ Long.toHexString(getSessionId()));
}
SendThread.sendPing
private void sendPing() {
lastPingSentNs = System.nanoTime();
RequestHeader h = new RequestHeader(-2, OpCode.ping);
queuePacket(h, null, null, null, null, null, null, null, null);
}
ClientCnxn.queuePacket
封装一个 packet 并添加到 queuePacket 队列中
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) {Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
synchronized (outgoingQueue) {
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
ClientCnxnSocketNIO.doTransport
@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn) throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0){
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
synchronized(outgoingQueue) {
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
enableWrite();
}
}
}
selected.clear();
}
ClientCnxnSocketNIO.doIO
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
// read
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"+ Long.toHexString(sessionId) + ", likely server has closed socket"); }
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount++;
readLength();
} else if (!initialized) {
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
// write
if (sockKey.isWritable()) {
synchronized(outgoingQueue) {
Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
}
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount++;
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
disableWrite();
} else {
enableWrite();
}
}
}
}
ClientCnxn.Packet.createBB
数据包中包含 requestHeader.serialize(boa, "header")和request.serialize(boa, "request")
public void createBB() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeInt(-1, "len"); // We'll fill this in later
// zookeeper 自己序列化方式 jute
if (requestHeader != null) {
requestHeader.serialize(boa, "header");
}
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request");
}
baos.close();
this.bb = ByteBuffer.wrap(baos.toByteArray());
this.bb.putInt(this.bb.capacity() - 4);
this.bb.rewind();
} catch (IOException e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
EventThread.run
@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
try {
isRunning = true;
while (true) {
// take 阻塞获取
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId()));
}
EventThread.processEvent
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
} else {
// ...
}
} catch (Throwable t) {
LOG.error("Caught unexpected throwable", t);
}
}
}
ZooKeeper.exists
public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException{
// defaultWatcher 为构造函数传入
return exists(path, watch ? watchManager.defaultWatcher : null);
}
ZooKeeper.exists2
public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException {final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ExistsWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
// type 后面判断要用到
h.setType(ZooDefs.OpCode.exists);
ExistsRequest request = new ExistsRequest();
request.setPath(serverPath); //path
request.setWatch(watcher != null); //true
SetDataResponse response = new SetDataResponse();
// cnxn 也是在构造函数中进行的初始化
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
if (r.getErr() == KeeperException.Code.NONODE.intValue()) {
return null;
}
throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
}
return response.getStat().getCzxid() == -1 ? null : response.getStat();
}
ClientCnxn.submitRequest
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration);
synchronized (packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}
NIOServerCnxnFactory.run
//当收到客户端的请求时,会需要从这个方法里面来看-> create/delete/setdata
public void run() {
while (!ss.socket().isClosed()) {
try {
selector.select(1000);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
selected);
Collections.shuffle(selectedList);
for (SelectionKey k : selectedList) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
// ...
// 读操作或者写操作
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected ops in select "+ k.readyOps());
}
}
}
selected.clear();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring exception", e);
}
}
closeAll();
LOG.info("NIOServerCnxn factory exited run method");
}
NIOServerCnxn.doIO
void doIO(SelectionKey k) throws InterruptedException {
try {
if (isSocketOpen() == false) {
LOG.warn("trying to do i/o on a null socket for session:0x"+ Long.toHexString(sessionId));
return;
}
if (k.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException("Unable to read additional data from client sessionid 0x"+ Long.toHexString(sessionId) + ", likely client has closed socket");
}
if (incomingBuffer.remaining() == 0) {
boolean isPayload;
if (incomingBuffer == lenBuffer) { // start of next request
incomingBuffer.flip();
isPayload = readLength(k);
incomingBuffer.clear();
} else {
// continuation
isPayload = true;
}
if (isPayload) { // not the case for 4letterword
readPayload();
}
else {
return;
}
}
}
if (k.isWritable()) {
if (outgoingBuffers.size() > 0) {
ByteBuffer directBuffer = factory.directBuffer;
directBuffer.clear();
for (ByteBuffer b : outgoingBuffers) {
if (directBuffer.remaining() < b.remaining()) {
b = (ByteBuffer) b.slice().limit(directBuffer.remaining());
}
int p = b.position();
directBuffer.put(b);
b.position(p);
if (directBuffer.remaining() == 0) {
break;
}
}
directBuffer.flip();
int sent = sock.write(directBuffer);
ByteBuffer bb;
// Remove the buffers that we have sent
while (outgoingBuffers.size() > 0) {
bb = outgoingBuffers.peek();
if (bb == ServerCnxnFactory.closeConn) {
throw new CloseRequestException("close requested");
}
int left = bb.remaining() - sent;
if (left > 0) {
bb.position(bb.position() + sent);
break;
}
packetSent();
sent -= bb.remaining();
outgoingBuffers.remove();
}
}
synchronized(this.factory){
if (outgoingBuffers.size() == 0) {
if (!initialized && (sk.interestOps() & SelectionKey.OP_READ) == 0) {
throw new CloseRequestException("responded to info probe");
}
sk.interestOps(sk.interestOps() & (~SelectionKey.OP_WRITE));
} else {
sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
}
}
}
} catch (CancelledKeyException e) {
// ...
}
}
NIOServerCnxn.readPayload
private void readPayload() throws IOException, InterruptedException {
if (incomingBuffer.remaining() != 0) { // have we read length bytes?
int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
if (rc < 0) {
throw new EndOfStreamException("Unable to read additional data from client sessionid 0x"+ Long.toHexString(sessionId) + ", likely client has closed socket");
}
}
if (incomingBuffer.remaining() == 0) { // have we read length bytes?
packetReceived();
incomingBuffer.flip();
if (!initialized) {
readConnectRequest();
} else {
readRequest();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
}
NIOServerCnxn.readRequest
private void readRequest() throws IOException {
zkServer.processPacket(this, incomingBuffer);
}
ZooKeeperServer.processPacket
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
// We have the request, now process and setup for next
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
/**
* RequestHeader h = new RequestHeader();
* 调用 exists 方法,type为 OpCode.exists
* h.setType(ZooDefs.OpCode.exists);
*/
RequestHeader h = new RequestHeader();
// 反序列化 header
h.deserialize(bia, "header");
incomingBuffer = incomingBuffer.slice();
if (h.getType() == OpCode.auth) {
// ...
return;
} else {
if (h.getType() == OpCode.sasl) {
Record rsp = processSasl(incomingBuffer,cnxn);
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
return;
} else {
// exists 会走这里
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
submitRequest(si);
}
}
cnxn.incrOutstandingRequests(h);
}
ZooKeeperServer.submitRequest
public void submitRequest(Request si) {
if (firstProcessor == null) {
synchronized (this) {
try {
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping request: " + e.getMessage());
}
} catch (RequestProcessorException e) {
LOG.error("Unable to process request:" + e.getMessage(), e);
}
}
ZooKeeperServer.setupRequestProcessors
protected void setupRequestProcessors() {
// 责任链: PrepRequestProcessor->SyncRequestProcessor->FinalRequestProcessor
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
// 启动 SyncRequestProcessor 线程
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
// 启动 PrepRequestProcessor 线程
((PrepRequestProcessor)firstProcessor).start();
}
FollowerZooKeeperServer.setupRequestProcessors
@Override
protected void setupRequestProcessors() {
// 责任链:follow中有两条链
// FollowerRequestProcessor->CommitProcessor->FinalRequestProcessor
// SyncRequestProcessor->SendAckRequestProcessor
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true,
getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this,
new SendAckRequestProcessor((Learner)getFollower()));
syncProcessor.start();
}
LeaderZooKeeperServer.setupRequestProcessors
@Override
protected void setupRequestProcessors() {
// 责任链:PrepRequestProcessor->ProposalRequestProcessor->CommitProcessor->ToBeAppliedRequestProcessor->FinalRequestProcessor
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor( finalProcessor, getLeader().toBeApplied);
commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
proposalProcessor.initialize();
firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
ObserverZooKeeperServer.setupRequestProcessors
@Override
protected void setupRequestProcessors() {
// 责任链:ObserverRequestProcessor->CommitProcessor->FinalRequestProcessor
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
((ObserverRequestProcessor) firstProcessor).start();
if (syncRequestProcessorEnabled) {
syncProcessor = new SyncRequestProcessor(this, null);
syncProcessor.start();
}
}
PrepRequestProcessor.processRequest
public void processRequest(Request request) {
// request.addRQRec(">prep="+zks.outstandingChanges.size());
submittedRequests.add(request);
}
PrepRequestProcessor.run
@Override
public void run() {
try {
while (true) {
Request request = submittedRequests.take();
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) { // 心跳
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
}
if (Request.requestOfDeath == request) {
break;
}
pRequest(request);
}
} catch (RequestProcessorException e) {
// ...
}
LOG.info("PrepRequestProcessor exited loop!");
}
PrepRequestProcessor.pRequest
protected void pRequest(Request request) throws RequestProcessorException {
request.hdr = null;
request.txn = null;
// ...
request.zxid = zks.getZxid();
nextProcessor.processRequest(request);
}
SyncRequestProcessor.processRequest
public void processRequest(Request request) {
// request.addRQRec(">sync");
queuedRequests.add(request);
}
SyncRequestProcessor.run
@Override
public void run() {
try {
int logCount = 0;
setRandRoll(r.nextInt(snapCount/2));
while (true) {
// ...
if (si != null) {
if (zks.getZKDatabase().append(si)) {
// ...
} else if (toFlush.isEmpty()) {
if (nextProcessor != null) {
nextProcessor.processRequest(si);
// ...
}
continue;
}
// ...
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}
FinalRequestProcessor.processRequest
public void processRequest(Request request) {
// ...
try {
// ...
case OpCode.exists: {
lastOp = "EXIS";
// TODO we need to figure out the security requirement for this!
ExistsRequest existsRequest = new ExistsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest);
// -> watcher
String path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
throw new KeeperException.BadArgumentsException();
}
// cnxn -> 服务端网络处理组件
Stat stat = zks.getZKDatabase().statNode(path, existsRequest .getWatch() ? cnxn : null);
rsp = new ExistsResponse(stat);
break;
}
} catch (SessionMovedException e) {
// ...
}
//...
try {
cnxn.sendResponse(hdr, rsp, "response");
if (closeSession) {
cnxn.sendCloseSession();
}
} catch (IOException e) {
LOG.error("FIXMSG",e);
}
}
ZKDatabase.statNode
public Stat statNode(String path, ServerCnxn serverCnxn) throws KeeperException.NoNodeException {
return dataTree.statNode(path, serverCnxn);
}
DataTree.statNode
public Stat statNode(String path, Watcher watcher) throws KeeperException.NoNodeException {
Stat stat = new Stat();
DataNode n = nodes.get(path);
if (watcher != null) {
dataWatches.addWatch(path, watcher);
}
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
n.copyStat(stat);
return stat;
}
}
WatchManager.addWatch
public synchronized void addWatch(String path, Watcher watcher) {
HashSet<Watcher> list = watchTable.get(path);
if (list == null) {
// 这里watch实际上是前面的cnxn
// zks.getZKDatabase().statNode(path, existsRequest .getWatch() ? cnxn : null)
list = new HashSet<Watcher>(4);
watchTable.put(path, list);
}
list.add(watcher);
HashSet<String> paths = watch2Paths.get(watcher);
if (paths == null) {
// cnxns typically have many watches, so use default cap here
paths = new HashSet<String>();
watch2Paths.put(watcher, paths);
}
paths.add(path);
}
NIOServerCnxn.sendResponse
synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
// Make space for length
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
try {
baos.write(fourBytes);
bos.writeRecord(h, "header");
if (r != null) {
bos.writeRecord(r, tag);
}
baos.close();
} catch (IOException e) {
LOG.error("Error serializing response");
}
byte b[] = baos.toByteArray();
ByteBuffer bb = ByteBuffer.wrap(b);
bb.putInt(b.length - 4).rewind();
sendBuffer(bb);
// ...
} catch(Exception e) {
LOG.warn("Unexpected exception. Destruction averted.", e);
}
}
ClientCnxnSocketNIO.doIO2
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
// ...
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount++;
readLength();
} else if (!initialized) {
// ...
} else {
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
SendThread.readResponse
void readResponse(ByteBuffer incomingBuffer) throws IOException {
// ...
try {
// ...
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
// 反序列化 response
packet.response.deserialize(bbia, "response");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x" + Long.toHexString(sessionId) + ", packet:: " + packet);
}
} finally {
finishPacket(packet);
}
}
ClientCnxn.finishPacket
private void finishPacket(Packet p) {
if (p.watchRegistration != null) {
// watchRegistration -> ExistWatchRegisteration
// wcb = new ExistsWatchRegistration(watcher, clientPath);
p.watchRegistration.register(p.replyHeader.getErr());
}
if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}
}
WatchRegistration.register
public void register(int rc) {
if (shouldAddWatch(rc)) {
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized(watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}
WatchRegistration.getWatches
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
// key->path, value->自定义watcher
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
return rc == 0 ? watchManager.dataWatches : watchManager.existWatches;
}
EventThread.queuePacket
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void queuePacket(Packet packet) {
if (wasKilled) {
synchronized (waitingEvents) {
if (isRunning) waitingEvents.add(packet);
else processEvent(packet);
}
} else {
waitingEvents.add(packet);
}
}
FinalRequestProcessor.processRequest2
synchronized (zks.outstandingChanges) {
// ...
if (request.hdr != null) {
TxnHeader hdr = request.hdr;
Record txn = request.txn;
// 如果是一个事务请求,setData/delete等操作
rc = zks.processTxn(hdr, txn);
}
// do not add non quorum packets to the queue.
if (Request.isQuorum(request.type)) {
zks.getZKDatabase().addCommittedProposal(request);
}
}
ZooKeeperServer.processTxn
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
ProcessTxnResult rc;
int opCode = hdr.getType();
long sessionId = hdr.getClientId();
rc = getZKDatabase().processTxn(hdr, txn);
if (opCode == OpCode.createSession) {
if (txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) txn;
sessionTracker.addSession(sessionId, cst.getTimeOut());
} else {
LOG.warn("*****>>>>> Got " + txn.getClass() + " " + txn.toString());
}
} else if (opCode == OpCode.closeSession) {
sessionTracker.removeSession(sessionId);
}
return rc;
}
ZKDatabase.processTxn
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
return dataTree.processTxn(hdr, txn);
}
DataTree.processTxn
public ProcessTxnResult processTxn(TxnHeader header, Record txn) {
switch (header.getType()) {
case OpCode.setData:
SetDataTxn setDataTxn = (SetDataTxn) txn;
rc.path = setDataTxn.getPath();
rc.stat = setData(setDataTxn.getPath(), setDataTxn .getData(), setDataTxn.getVersion(), header .getZxid(), header.getTime());
break;
}
} catch (KeeperException e) {
// ...
}
return rc;
}
DataTree.setData
public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException {
Stat s = new Stat();
DataNode n = nodes.get(path);
// ...
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
WatchManager.triggerWatch
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
// map中移除了注册的watcher,所以只能触发一次
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
// key->cnxn,value->paths
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);
}
return watchers;
}
Watcher.process
synchronized public void process(WatchedEvent event) {
// 注意这里,xid=-1,zxid=-1
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
// ...
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
// 发送返回信息给客户端
sendResponse(h, e, "notification");
}
ClientCnxnSocketNIO.doIO3
void readResponse(ByteBuffer incomingBuffer) throws IOException {
// ...
if (replyHdr.getXid() == -1) {
// -1 means notification
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
// convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath);
}
}
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId));
}
eventThread.queueEvent( we );
return;
}
}
EventThread.queueEvent
public void queueEvent(WatchedEvent event) {
if (event.getType() == EventType.None && sessionState == event.getState()) {
return;
}
sessionState = event.getState();
// materialize the watchers based on the event
WatcherSetEventPair pair = new WatcherSetEventPair(watcher.materialize(event.getState(), event.getType(), event.getPath()), event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}
EventThread.run2
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
// ...
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
}
EventThread.processEvent2
private void processEvent(Object event) {
try {
// 消费事件
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
// 这里就是调用自定义watcher
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
} else {
// 消费 packet
// ...
}
}
}