publicListener() { // During startup of thread, thread name will be overridden to // specific election address super("ListenerThread");
socketException = newAtomicBoolean(false);
// maximum retry count while trying to bind to election port // see ZOOKEEPER-3320 for more details finalIntegermaxRetry= Integer.getInteger(ELECTION_PORT_BIND_RETRY, DEFAULT_PORT_BIND_MAX_RETRY); if (maxRetry >= 0) { LOG.info("Election port bind maximum retries is {}", maxRetry == 0 ? "infinite" : maxRetry); portBindMaxRetry = maxRetry; } else { LOG.info( "'{}' contains invalid value: {}(must be >= 0). Use default value of {} instead.", ELECTION_PORT_BIND_RETRY, maxRetry, DEFAULT_PORT_BIND_MAX_RETRY); portBindMaxRetry = DEFAULT_PORT_BIND_MAX_RETRY; } }
finalExecutorServiceexecutor= Executors.newFixedThreadPool(addresses.size()); try { listenerHandlers.forEach(executor::submit); } finally { // prevent executor's threads to leak after ListenerHandler tasks complete executor.shutdown(); }
try { latch.await(); } catch (InterruptedException ie) { LOG.error("Interrupted while sleeping. Ignoring exception", ie); } finally { // Clean up for shutdown. for (ListenerHandler handler : listenerHandlers) { try { handler.close(); } catch (IOException ie) { // Don't log an error for shutdown. LOG.debug("Error closing server socket", ie); } } } }
LOG.info("Leaving listener"); if (!shutdown) { LOG.error( "As I'm leaving the listener thread, I won't be able to participate in leader election any longer: {}", self.getElectionAddress().getAllAddresses().stream() .map(NetUtils::formatInetAddr) .collect(Collectors.joining("|"))); if (socketException.get()) { // After leaving listener thread, the host cannot join the quorum anymore, // this is a severe error that we cannot recover from, so we need to exit socketBindErrorHandler.run(); } } }
4.3 ListenerHandler
本质是个Runnable任务,上面代码看出来是丢在线程池异步执行的,关注它的run()实现。
run()方法
该方法是入口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
@Override publicvoidrun() { try { Thread.currentThread().setName("ListenerHandler-" + address); acceptConnections(); // 负责接收投票端口的数据 try { close(); } catch (IOException e) { LOG.warn("Exception when shutting down listener: ", e); } } catch (Exception e) { // Output of unexpected exception, should never happen LOG.error("Unexpected error ", e); } finally { latch.countDown(); } }
// 节点的选票端口接收数据容错是有限制的 如果超过限制(max(3, 自定义配置次数)) 就主观判定这个服务器的选票端口有问题了 也就是退出了选主 while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) { try { /** * TCP协议的Socket * 选主用的是TCP协议 */ serverSocket = createNewServerSocket(); LOG.info("{} is accepting connections now, my election bind port: {}", QuorumCnxManager.this.mySid, address.toString()); while (!shutdown) { try { /** * 服务端被动Socket已经监听在了选票端口上 * 现在阻塞在accept 等待客户端的请求过来 * 有客户端连接进来 给服务端开辟个新的Socket 用来跟客户端通信 */ client = serverSocket.accept(); setSockOpts(client); LOG.info("Received connection request from {}", client.getRemoteSocketAddress()); // Receive and handle the connection request // asynchronously if the quorum sasl authentication is // enabled. This is required because sasl server // authentication process may take few seconds to finish, // this may delay next peer connection requests. if (quorumSaslAuthEnabled) { receiveConnectionAsync(client); } else { /** * 陷在receive的系统调用阻塞上 * 接收其他节点发过来的投票 */ receiveConnection(client); } numRetries = 0; } catch (SocketTimeoutException e) { LOG.warn("The socket is listening for the election accepted " + "and it timed out unexpectedly, but will retry." + "see ZOOKEEPER-2836"); } } } catch (IOException e) { if (shutdown) { break; }
LOG.error("Exception while listening to address {}", address, e);
if (e instanceof SocketException) { socketException.set(true); }
numRetries++; try { close(); Thread.sleep(1000); } catch (IOException ie) { LOG.error("Error closing server socket", ie); } catch (InterruptedException ie) { LOG.error("Interrupted while sleeping. Ignoring exception", ie); } closeSocket(client); } } if (!shutdown) { LOG.error( "Leaving listener thread for address {} after {} errors. Use {} property to increase retry count.", formatInetAddr(address), numRetries, ELECTION_PORT_BIND_RETRY); } }
} elseif (sid == self.getId()) { // 自己给自己发送的也是不合法的 // we saw this case in ZOOKEEPER-2164 LOG.warn("We got a connection request from a server with our own ID. " + "This should be either a configuration error, or a bug."); } else { // Otherwise start worker threads to receive data. // 大sid向小sid的一次通信 合法的 自己是服务端小sid 对方是客户端大sid SendWorkersw=newSendWorker(sock, sid); RecvWorkerrw=newRecvWorker(sock, din, sid, sw); sw.setRecv(rw);
/** * 负责发送数据 */ @Override publicvoidrun() { threadCnt.incrementAndGet(); try { /** * If there is nothing in the queue to send, then we * send the lastMessage to ensure that the last message * was received by the peer. The message could be dropped * in case self or the peer shutdown their connection * (and exit the thread) prior to reading/processing * the last message. Duplicate messages are handled correctly * by the peer. * * If the send queue is non-empty, then we have a recent * message than that stored in lastMessage. To avoid sending * stale message, we should send the message in the send queue. */ BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid); // 发送给谁 要发送的数据 if (bq == null || isSendQueueEmpty(bq)) { ByteBufferb= lastMessageSent.get(sid); if (b != null) { LOG.debug("Attempting to send lastMessage to sid={}", sid); send(b); } } } catch (IOException e) { LOG.error("Failed to send last message. Shutting down thread.", e); this.finish(); } LOG.debug("SendWorker thread started towards {}. myId: {}", sid, QuorumCnxManager.this.mySid);
ByteBufferb=null; try { BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid); // 要发送什么数据给谁 数据的存储结构是循环队列 if (bq != null) { b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS); // 超时去取数据 } else { LOG.error("No queue of incoming messages for server {}", sid); break; }
if (b != null) { lastMessageSent.put(sid, b); send(b); } } catch (InterruptedException e) { LOG.warn("Interrupted while waiting for message on queue", e); } } } catch (Exception e) { LOG.warn( "Exception when using channel: for id {} my id = {}", sid , QuorumCnxManager.this.mySid, e); } this.finish();
LOG.warn("Send worker leaving thread id {} my id = {}", sid, self.getId()); }