/** * 对请求的处理逻辑 * - 记录对应的事务日志 委托给ZK实例 ZK实例再委托给ZKDatabase组件 * - 内存数据打快照 委托为ZK实例 ZK实例再委托为FileTxnSnaplog组件 */ @Override publicvoidrun() { try { // we do this in an attempt to ensure that not all of the servers // in the ensemble take a snapshot at the same time resetSnapshotStats(); lastFlushTime = Time.currentElapsedTime(); while (true) { ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
longpollTime= Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay()); Requestsi= queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS); // 提交在服务端的一个请求 已经经过了PreRequestProcessor处理 当前要处理的请求 if (si == null) { // 其实这个防御使NPE没必要 因为在processRequest()方法中入队前的先决条件就是非null /* We timed out looking for more writes to batch, go ahead and flush immediately */ flush(); si = queuedRequests.take(); // 继续阻塞式从缓存中拿请求 }
// track the number of records written to the log if (!si.isThrottled() && zks.getZKDatabase().append(si)) { // 写事务日志 if (shouldSnapshot()) { // 内存数据打快照 resetSnapshotStats(); // roll the log zks.getZKDatabase().rollLog(); // take a snapshot if (!snapThreadMutex.tryAcquire()) { LOG.warn("Too busy to snap, skipping"); } else { newZooKeeperThread("Snapshot Thread") { publicvoidrun() { try { zks.takeSnapshot(); // 内存快照 } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { snapThreadMutex.release(); } } }.start(); } } } elseif (toFlush.isEmpty()) { // optimization for read heavy workloads // iff this is a read or a throttled request(which doesn't need to be written to the disk), // and there are no pending flushes (writes), then just pass this to the next processor if (nextProcessor != null) { nextProcessor.processRequest(si); if (nextProcessor instanceof Flushable) { ((Flushable) nextProcessor).flush(); } } continue; } toFlush.add(si); if (shouldFlush()) { flush(); } ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime); } } catch (Throwable t) { handleException(this.getName(), t); } LOG.info("SyncRequestProcessor exited!"); }