From dev-return-73486-archive-asf-public=cust-asf.ponee.io@zookeeper.apache.org Fri Sep 21 20:16:36 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 17145180671 for ; Fri, 21 Sep 2018 20:16:35 +0200 (CEST) Received: (qmail 84762 invoked by uid 500); 21 Sep 2018 18:16:33 -0000 Mailing-List: contact dev-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list dev@zookeeper.apache.org Received: (qmail 83556 invoked by uid 99); 21 Sep 2018 18:16:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Sep 2018 18:16:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B7F63E11D8; Fri, 21 Sep 2018 18:16:31 +0000 (UTC) From: lvfangmin To: dev@zookeeper.apache.org Reply-To: dev@zookeeper.apache.org References: In-Reply-To: Subject: [GitHub] zookeeper pull request #628: ZOOKEEPER-3140: Allow Followers to host Observe... Content-Type: text/plain Message-Id: <20180921181631.B7F63E11D8@git1-us-west.apache.org> Date: Fri, 21 Sep 2018 18:16:31 +0000 (UTC) Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/628#discussion_r219582907 --- Diff: src/java/main/org/apache/zookeeper/server/quorum/ObserverMaster.java --- @@ -0,0 +1,513 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import org.apache.zookeeper.jmx.MBeanRegistry; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.ZKDatabase; + +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Used by Followers to host Observers. This reduces the network load on the Leader process by pushing + * the responsibility for keeping Observers in sync off the leading peer. + * + * It is expected that Observers will continue to perform the initial vetting of clients and requests. + * Observers send the request to the follower where it is received by an ObserverMaster. + * + * The ObserverMaster forwards a copy of the request to the ensemble Leader and inserts it into its own + * request processor pipeline where it can be matched with the response comes back. All commits received + * from the Leader will be forwarded along to every Learner connected to the ObserverMaster. + * + * New Learners connecting to a Follower will receive a LearnerHandler object and be party to its syncing logic + * to be brought up to date. + * + * The logic is quite a bit simpler than the corresponding logic in Leader because it only hosts observers. + */ +public class ObserverMaster implements LearnerMaster, Runnable { + private static final Logger LOG = LoggerFactory.getLogger(ObserverMaster.class); + + //Follower counter + private final AtomicLong followerCounter = new AtomicLong(-1); + + private QuorumPeer self; + private FollowerZooKeeperServer zks; + private int port; + + private Set activeObservers = Collections.newSetFromMap(new ConcurrentHashMap()); + + private final ConcurrentHashMap connectionBeans = new ConcurrentHashMap<>(); + + /** + * we want to keep a log of past txns so that observers can sync up with us when we connect, + * but we can't keep everything in memory, so this limits how much memory will be dedicated + * to keeping recent txns. + */ + private final static int PKTS_SIZE_LIMIT = 32 * 1024 * 1024; + private static volatile int pktsSizeLimit = Integer.getInteger("zookeeper.observerMaster.sizeLimit", PKTS_SIZE_LIMIT); + private ConcurrentLinkedQueue proposedPkts = new ConcurrentLinkedQueue<>(); + private ConcurrentLinkedQueue committedPkts = new ConcurrentLinkedQueue<>(); + private int pktsSize = 0; + + private long lastProposedZxid; + + // ensure ordering of revalidations returned to this learner + private final Object revalidateSessionLock = new Object(); + + // Throttle when there are too many concurrent snapshots being sent to observers + private static final String MAX_CONCURRENT_SNAPSHOTS = "zookeeper.leader.maxConcurrentSnapshots"; + private static final int maxConcurrentSnapshots; + + private static final String MAX_CONCURRENT_DIFFS = "zookeeper.leader.maxConcurrentDiffs"; + private static final int maxConcurrentDiffs; + static { + maxConcurrentSnapshots = Integer.getInteger(MAX_CONCURRENT_SNAPSHOTS, 10); + LOG.info(MAX_CONCURRENT_SNAPSHOTS + " = " + maxConcurrentSnapshots); + + maxConcurrentDiffs = Integer.getInteger(MAX_CONCURRENT_DIFFS, 100); + LOG.info(MAX_CONCURRENT_DIFFS + " = " + maxConcurrentDiffs); + } + + private final ConcurrentLinkedQueue pendingRevalidations = new ConcurrentLinkedQueue<>(); + static class Revalidation { + public final long sessionId; + public final int timeout; + public final LearnerHandler handler; + + Revalidation(final Long sessionId, final int timeout, final LearnerHandler handler) { + this.sessionId = sessionId; + this.timeout = timeout; + this.handler = handler; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + final Revalidation that = (Revalidation) o; + return sessionId == that.sessionId && timeout == that.timeout && handler.equals(that.handler); + } + + @Override + public int hashCode() { + int result = (int) (sessionId ^ (sessionId >>> 32)); + result = 31 * result + timeout; + result = 31 * result + handler.hashCode(); + return result; + } + } + + private final LearnerSnapshotThrottler learnerSnapshotThrottler = + new LearnerSnapshotThrottler(maxConcurrentSnapshots); + + private Thread thread; + private ServerSocket ss; + private boolean listenerRunning; + private ScheduledExecutorService pinger; + + Runnable ping = new Runnable() { + @Override + public void run() { + for (LearnerHandler lh: activeObservers) { + lh.ping(); + } + } + }; + + ObserverMaster(QuorumPeer self, FollowerZooKeeperServer zks, int port) { + this.self = self; + this.zks = zks; + this.port = port; + } + + @Override + public void addLearnerHandler(LearnerHandler learnerHandler) { + if (!listenerRunning) { + throw new RuntimeException(("ObserverMaster is not running")); + } + } + + @Override + public void removeLearnerHandler(LearnerHandler learnerHandler) { + activeObservers.remove(learnerHandler); + } + + @Override + public int syncTimeout() { + return self.getSyncLimit() * self.getTickTime(); + } + + @Override + public int getTickOfNextAckDeadline() { + return self.tick.get() + self.syncLimit; + } + + @Override + public int getTickOfInitialAckDeadline() { + return self.tick.get() + self.initLimit + self.syncLimit; + } + + @Override + public long getAndDecrementFollowerCounter() { + return followerCounter.getAndDecrement(); + } + + @Override + public void waitForEpochAck(long sid, StateSummary ss) throws IOException, InterruptedException { + // since this is done by an active follower, we don't need to wait for anything + } + + @Override + public LearnerSnapshotThrottler getLearnerSnapshotThrottler() { + return learnerSnapshotThrottler; + } + + @Override + public void waitForStartup() throws InterruptedException { + // since this is done by an active follower, we don't need to wait for anything + } + + @Override + synchronized public long getLastProposed() { + return lastProposedZxid; + } + + @Override + public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException { + return self.getCurrentEpoch(); + } + + @Override + public ZKDatabase getZKDatabase() { + return zks.getZKDatabase(); + } + + @Override + public void waitForNewLeaderAck(long sid, long zxid) throws InterruptedException { + // no need to wait since we are a follower + } + + @Override + public int getCurrentTick() { + return self.tick.get(); + } + + @Override + public void processAck(long sid, long zxid, SocketAddress localSocketAddress) { + if ((zxid & 0xffffffffL) == 0) { + /* + * We no longer process NEWLEADER ack by this method. However, + * the learner sends ack back to the leader after it gets UPTODATE + * so we just ignore the message. + */ + return; + } + + throw new RuntimeException("Observers shouldn't send ACKS ack = " + Long.toHexString(zxid)); + } + + @Override + public void touch(long sess, int to) { + zks.getSessionTracker().touchSession(sess, to); + } + + boolean revalidateLearnerSession(QuorumPacket qp) throws IOException { + ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData()); + DataInputStream dis = new DataInputStream(bis); + long id = dis.readLong(); + boolean valid = dis.readBoolean(); + Iterator itr = pendingRevalidations.iterator(); + if (!itr.hasNext()) { + // not a learner session, handle locally + return false; + } + Revalidation revalidation = itr.next(); + if (revalidation.sessionId != id) { + // not a learner session, handle locally + return false; + } + itr.remove(); + LearnerHandler learnerHandler = revalidation.handler; + QuorumPacket deepCopy = new QuorumPacket(qp.getType(), qp.getZxid(), + Arrays.copyOf(qp.getData(), qp.getData().length), + qp.getAuthinfo() == null ? null : new ArrayList<>(qp.getAuthinfo())); + learnerHandler.queuePacket(deepCopy); + // To keep consistent as leader, touch the session when it's + // revalidating the session, only update if it's a valid session. + if (valid) { + touch(revalidation.sessionId, revalidation.timeout); + } + return true; + } + + @Override + public void revalidateSession(QuorumPacket qp, LearnerHandler learnerHandler) throws IOException { + ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData()); + DataInputStream dis = new DataInputStream(bis); + long id = dis.readLong(); + int to = dis.readInt(); + synchronized (revalidateSessionLock) { + pendingRevalidations.add(new Revalidation(id, to, learnerHandler)); + Learner learner = zks.getLearner(); + if (learner != null) { + learner.writePacket(qp, true); + } + } + } + + @Override + public void submitLearnerRequest(Request si) { --- End diff -- Also add some comments here to show why we add the request to the firstProcessor without going through the validation (same as leader), and show why we have it going through the whole pipeline of this ObserverMaster (I think our decision at that time is make it simpler and identical with leader). Add those comments will show the thoughts why we're doing this, and make the review easier. ---