zookeeper-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lvfangmin <...@git.apache.org>
Subject [GitHub] zookeeper pull request #628: ZOOKEEPER-3140: Allow Followers to host Observe...
Date Fri, 21 Sep 2018 18:16:31 GMT
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/628#discussion_r219582907
  
    --- Diff: src/java/main/org/apache/zookeeper/server/quorum/ObserverMaster.java ---
    @@ -0,0 +1,513 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.quorum;
    +
    +import org.apache.zookeeper.jmx.MBeanRegistry;
    +import org.apache.zookeeper.server.Request;
    +import org.apache.zookeeper.server.ZKDatabase;
    +
    +import java.io.BufferedInputStream;
    +import java.io.ByteArrayInputStream;
    +import java.io.DataInputStream;
    +import java.io.IOException;
    +import java.net.ServerSocket;
    +import java.net.Socket;
    +import java.net.SocketAddress;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Used by Followers to host Observers. This reduces the network load on the Leader process
by pushing
    + * the responsibility for keeping Observers in sync off the leading peer.
    + *
    + * It is expected that Observers will continue to perform the initial vetting of clients
and requests.
    + * Observers send the request to the follower where it is received by an ObserverMaster.
    + *
    + * The ObserverMaster forwards a copy of the request to the ensemble Leader and inserts
it into its own
    + * request processor pipeline where it can be matched with the response comes back. All
commits received
    + * from the Leader will be forwarded along to every Learner connected to the ObserverMaster.
    + *
    + * New Learners connecting to a Follower will receive a LearnerHandler object and be
party to its syncing logic
    + * to be brought up to date.
    + *
    + * The logic is quite a bit simpler than the corresponding logic in Leader because it
only hosts observers.
    + */
    +public class ObserverMaster implements LearnerMaster, Runnable {
    +    private static final Logger LOG = LoggerFactory.getLogger(ObserverMaster.class);
    +
    +    //Follower counter
    +    private final AtomicLong followerCounter = new AtomicLong(-1);
    +
    +    private QuorumPeer self;
    +    private FollowerZooKeeperServer zks;
    +    private int port;
    +
    +    private Set<LearnerHandler> activeObservers = Collections.newSetFromMap(new
ConcurrentHashMap<LearnerHandler,Boolean>());
    +
    +    private final ConcurrentHashMap<LearnerHandler, LearnerHandlerBean> connectionBeans
= new ConcurrentHashMap<>();
    +
    +    /**
    +     * we want to keep a log of past txns so that observers can sync up with us when
we connect,
    +     * but we can't keep everything in memory, so this limits how much memory will be
dedicated
    +     * to keeping recent txns.
    +     */
    +    private final static int PKTS_SIZE_LIMIT = 32 * 1024 * 1024;
    +    private static volatile int pktsSizeLimit = Integer.getInteger("zookeeper.observerMaster.sizeLimit",
PKTS_SIZE_LIMIT);
    +    private ConcurrentLinkedQueue<QuorumPacket> proposedPkts = new ConcurrentLinkedQueue<>();
    +    private ConcurrentLinkedQueue<QuorumPacket> committedPkts = new ConcurrentLinkedQueue<>();
    +    private int pktsSize = 0;
    +
    +    private long lastProposedZxid;
    +
    +    // ensure ordering of revalidations returned to this learner
    +    private final Object revalidateSessionLock = new Object();
    +
    +    // Throttle when there are too many concurrent snapshots being sent to observers
    +    private static final String MAX_CONCURRENT_SNAPSHOTS = "zookeeper.leader.maxConcurrentSnapshots";
    +    private static final int maxConcurrentSnapshots;
    +
    +    private static final String MAX_CONCURRENT_DIFFS = "zookeeper.leader.maxConcurrentDiffs";
    +    private static final int maxConcurrentDiffs;
    +    static {
    +        maxConcurrentSnapshots = Integer.getInteger(MAX_CONCURRENT_SNAPSHOTS, 10);
    +        LOG.info(MAX_CONCURRENT_SNAPSHOTS + " = " + maxConcurrentSnapshots);
    +
    +        maxConcurrentDiffs = Integer.getInteger(MAX_CONCURRENT_DIFFS, 100);
    +        LOG.info(MAX_CONCURRENT_DIFFS + " = " + maxConcurrentDiffs);
    +    }
    +
    +    private final ConcurrentLinkedQueue<Revalidation> pendingRevalidations = new
ConcurrentLinkedQueue<>();
    +    static class Revalidation {
    +        public final long sessionId;
    +        public final int timeout;
    +        public final LearnerHandler handler;
    +
    +        Revalidation(final Long sessionId, final int timeout, final LearnerHandler handler)
{
    +            this.sessionId = sessionId;
    +            this.timeout = timeout;
    +            this.handler = handler;
    +        }
    +
    +        @Override
    +        public boolean equals(Object o) {
    +            if (this == o) return true;
    +            if (o == null || getClass() != o.getClass()) return false;
    +
    +            final Revalidation that = (Revalidation) o;
    +            return sessionId == that.sessionId && timeout == that.timeout &&
handler.equals(that.handler);
    +        }
    +
    +        @Override
    +        public int hashCode() {
    +            int result = (int) (sessionId ^ (sessionId >>> 32));
    +            result = 31 * result + timeout;
    +            result = 31 * result + handler.hashCode();
    +            return result;
    +        }
    +    }
    +
    +    private final LearnerSnapshotThrottler learnerSnapshotThrottler =
    +            new LearnerSnapshotThrottler(maxConcurrentSnapshots);
    +
    +    private Thread thread;
    +    private ServerSocket ss;
    +    private boolean listenerRunning;
    +    private ScheduledExecutorService pinger;
    +
    +    Runnable ping = new Runnable() {
    +        @Override
    +        public void run() {
    +            for (LearnerHandler lh: activeObservers) {
    +                lh.ping();
    +            }
    +        }
    +    };
    +
    +    ObserverMaster(QuorumPeer self, FollowerZooKeeperServer zks, int port) {
    +        this.self = self;
    +        this.zks = zks;
    +        this.port = port;
    +    }
    +
    +    @Override
    +    public void addLearnerHandler(LearnerHandler learnerHandler) {
    +        if (!listenerRunning) {
    +            throw new RuntimeException(("ObserverMaster is not running"));
    +        }
    +    }
    +
    +    @Override
    +    public void removeLearnerHandler(LearnerHandler learnerHandler) {
    +        activeObservers.remove(learnerHandler);
    +    }
    +
    +    @Override
    +    public int syncTimeout() {
    +        return self.getSyncLimit() * self.getTickTime();
    +    }
    +
    +    @Override
    +    public int getTickOfNextAckDeadline() {
    +        return self.tick.get() + self.syncLimit;
    +    }
    +
    +    @Override
    +    public int getTickOfInitialAckDeadline() {
    +        return self.tick.get() + self.initLimit + self.syncLimit;
    +    }
    +
    +    @Override
    +    public long getAndDecrementFollowerCounter() {
    +        return followerCounter.getAndDecrement();
    +    }
    +
    +    @Override
    +    public void waitForEpochAck(long sid, StateSummary ss) throws IOException, InterruptedException
{
    +        // since this is done by an active follower, we don't need to wait for anything
    +    }
    +
    +    @Override
    +    public LearnerSnapshotThrottler getLearnerSnapshotThrottler() {
    +        return learnerSnapshotThrottler;
    +    }
    +
    +    @Override
    +    public void waitForStartup() throws InterruptedException {
    +        // since this is done by an active follower, we don't need to wait for anything
    +    }
    +
    +    @Override
    +    synchronized public long getLastProposed() {
    +        return lastProposedZxid;
    +    }
    +
    +    @Override
    +    public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException,
IOException {
    +        return self.getCurrentEpoch();
    +    }
    +
    +    @Override
    +    public ZKDatabase getZKDatabase() {
    +        return zks.getZKDatabase();
    +    }
    +
    +    @Override
    +    public void waitForNewLeaderAck(long sid, long zxid) throws InterruptedException
{
    +        // no need to wait since we are a follower
    +    }
    +
    +    @Override
    +    public int getCurrentTick() {
    +        return self.tick.get();
    +    }
    +
    +    @Override
    +    public void processAck(long sid, long zxid, SocketAddress localSocketAddress) {
    +        if ((zxid & 0xffffffffL) == 0) {
    +            /*
    +             * We no longer process NEWLEADER ack by this method. However,
    +             * the learner sends ack back to the leader after it gets UPTODATE
    +             * so we just ignore the message.
    +             */
    +            return;
    +        }
    +
    +        throw new RuntimeException("Observers shouldn't send ACKS ack = " + Long.toHexString(zxid));
    +    }
    +
    +    @Override
    +    public void touch(long sess, int to) {
    +        zks.getSessionTracker().touchSession(sess, to);
    +    }
    +
    +    boolean revalidateLearnerSession(QuorumPacket qp) throws IOException {
    +        ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
    +        DataInputStream dis = new DataInputStream(bis);
    +        long id = dis.readLong();
    +        boolean valid = dis.readBoolean();
    +        Iterator<Revalidation> itr = pendingRevalidations.iterator();
    +        if (!itr.hasNext()) {
    +            // not a learner session, handle locally
    +            return false;
    +        }
    +        Revalidation revalidation = itr.next();
    +        if (revalidation.sessionId != id) {
    +            // not a learner session, handle locally
    +            return false;
    +        }
    +        itr.remove();
    +        LearnerHandler learnerHandler = revalidation.handler;
    +        QuorumPacket deepCopy = new QuorumPacket(qp.getType(), qp.getZxid(),
    +                Arrays.copyOf(qp.getData(), qp.getData().length),
    +                qp.getAuthinfo() == null ? null : new ArrayList<>(qp.getAuthinfo()));
    +        learnerHandler.queuePacket(deepCopy);
    +        // To keep consistent as leader, touch the session when it's
    +        // revalidating the session, only update if it's a valid session.
    +        if (valid) {
    +            touch(revalidation.sessionId, revalidation.timeout);
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public void revalidateSession(QuorumPacket qp, LearnerHandler learnerHandler) throws
IOException {
    +        ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
    +        DataInputStream dis = new DataInputStream(bis);
    +        long id = dis.readLong();
    +        int to = dis.readInt();
    +        synchronized (revalidateSessionLock) {
    +            pendingRevalidations.add(new Revalidation(id, to, learnerHandler));
    +            Learner learner = zks.getLearner();
    +            if (learner != null) {
    +                learner.writePacket(qp, true);
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void submitLearnerRequest(Request si) {
    --- End diff --
    
    Also add some comments here to show why we add the request to the firstProcessor without
going through the validation (same as leader), and show why we have it going through the whole
pipeline of this ObserverMaster (I think our decision at that time is make it simpler and
identical with leader).
    
    Add those comments will show the thoughts why we're doing this, and make the review easier.


---

Mime
View raw message