zookeeper-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lvfangmin <...@git.apache.org>
Subject [GitHub] zookeeper pull request #628: ZOOKEEPER-3140: Allow Followers to host Observe...
Date Thu, 06 Dec 2018 19:16:28 GMT
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/628#discussion_r239578960
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java
---
    @@ -0,0 +1,514 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.quorum;
    +
    +import org.apache.zookeeper.jmx.MBeanRegistry;
    +import org.apache.zookeeper.server.Request;
    +import org.apache.zookeeper.server.ZKDatabase;
    +
    +import java.io.BufferedInputStream;
    +import java.io.ByteArrayInputStream;
    +import java.io.DataInputStream;
    +import java.io.IOException;
    +import java.net.ServerSocket;
    +import java.net.Socket;
    +import java.net.SocketAddress;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Used by Followers to host Observers. This reduces the network load on the Leader process
by pushing
    + * the responsibility for keeping Observers in sync off the leading peer.
    + *
    + * It is expected that Observers will continue to perform the initial vetting of clients
and requests.
    + * Observers send the request to the follower where it is received by an ObserverMaster.
    + *
    + * The ObserverMaster forwards a copy of the request to the ensemble Leader and inserts
it into its own
    + * request processor pipeline where it can be matched with the response comes back. All
commits received
    + * from the Leader will be forwarded along to every Learner connected to the ObserverMaster.
    + *
    + * New Learners connecting to a Follower will receive a LearnerHandler object and be
party to its syncing logic
    + * to be brought up to date.
    + *
    + * The logic is quite a bit simpler than the corresponding logic in Leader because it
only hosts observers.
    + */
    +public class ObserverMaster implements LearnerMaster, Runnable {
    +    private static final Logger LOG = LoggerFactory.getLogger(ObserverMaster.class);
    +
    +    //Follower counter
    +    private final AtomicLong followerCounter = new AtomicLong(-1);
    +
    +    private QuorumPeer self;
    +    private FollowerZooKeeperServer zks;
    +    private int port;
    +
    +    private Set<LearnerHandler> activeObservers = Collections.newSetFromMap(new
ConcurrentHashMap<LearnerHandler,Boolean>());
    +
    +    private final ConcurrentHashMap<LearnerHandler, LearnerHandlerBean> connectionBeans
= new ConcurrentHashMap<>();
    +
    +    /**
    +     * we want to keep a log of past txns so that observers can sync up with us when
we connect,
    +     * but we can't keep everything in memory, so this limits how much memory will be
dedicated
    +     * to keeping recent txns.
    +     */
    +    private final static int PKTS_SIZE_LIMIT = 32 * 1024 * 1024;
    +    private static volatile int pktsSizeLimit = Integer.getInteger("zookeeper.observerMaster.sizeLimit",
PKTS_SIZE_LIMIT);
    +    private ConcurrentLinkedQueue<QuorumPacket> proposedPkts = new ConcurrentLinkedQueue<>();
    +    private ConcurrentLinkedQueue<QuorumPacket> committedPkts = new ConcurrentLinkedQueue<>();
    +    private int pktsSize = 0;
    +
    +    private long lastProposedZxid;
    +
    +    // ensure ordering of revalidations returned to this learner
    +    private final Object revalidateSessionLock = new Object();
    +
    +    // Throttle when there are too many concurrent snapshots being sent to observers
    +    private static final String MAX_CONCURRENT_SNAPSHOTS = "zookeeper.leader.maxConcurrentSnapshots";
    +    private static final int maxConcurrentSnapshots;
    +
    +    private static final String MAX_CONCURRENT_DIFFS = "zookeeper.leader.maxConcurrentDiffs";
    +    private static final int maxConcurrentDiffs;
    +    static {
    +        maxConcurrentSnapshots = Integer.getInteger(MAX_CONCURRENT_SNAPSHOTS, 10);
    +        LOG.info(MAX_CONCURRENT_SNAPSHOTS + " = " + maxConcurrentSnapshots);
    +
    +        maxConcurrentDiffs = Integer.getInteger(MAX_CONCURRENT_DIFFS, 100);
    +        LOG.info(MAX_CONCURRENT_DIFFS + " = " + maxConcurrentDiffs);
    +    }
    +
    +    private final ConcurrentLinkedQueue<Revalidation> pendingRevalidations = new
ConcurrentLinkedQueue<>();
    +    static class Revalidation {
    +        public final long sessionId;
    +        public final int timeout;
    +        public final LearnerHandler handler;
    +
    +        Revalidation(final Long sessionId, final int timeout, final LearnerHandler handler)
{
    +            this.sessionId = sessionId;
    +            this.timeout = timeout;
    +            this.handler = handler;
    +        }
    +
    +        @Override
    +        public boolean equals(Object o) {
    +            if (this == o) return true;
    +            if (o == null || getClass() != o.getClass()) return false;
    +
    +            final Revalidation that = (Revalidation) o;
    +            return sessionId == that.sessionId && timeout == that.timeout &&
handler.equals(that.handler);
    +        }
    +
    +        @Override
    +        public int hashCode() {
    +            int result = (int) (sessionId ^ (sessionId >>> 32));
    +            result = 31 * result + timeout;
    +            result = 31 * result + handler.hashCode();
    +            return result;
    +        }
    +    }
    +
    +    private final LearnerSnapshotThrottler learnerSnapshotThrottler =
    +            new LearnerSnapshotThrottler(maxConcurrentSnapshots);
    +
    +    private Thread thread;
    +    private ServerSocket ss;
    +    private boolean listenerRunning;
    +    private ScheduledExecutorService pinger;
    +
    +    Runnable ping = new Runnable() {
    +        @Override
    +        public void run() {
    +            for (LearnerHandler lh: activeObservers) {
    +                lh.ping();
    +            }
    +        }
    +    };
    +
    +    ObserverMaster(QuorumPeer self, FollowerZooKeeperServer zks, int port) {
    +        this.self = self;
    +        this.zks = zks;
    +        this.port = port;
    +    }
    +
    +    @Override
    +    public void addLearnerHandler(LearnerHandler learnerHandler) {
    +        if (!listenerRunning) {
    +            throw new RuntimeException(("ObserverMaster is not running"));
    +        }
    +    }
    +
    +    @Override
    +    public void removeLearnerHandler(LearnerHandler learnerHandler) {
    +        activeObservers.remove(learnerHandler);
    +    }
    +
    +    @Override
    +    public int syncTimeout() {
    +        return self.getSyncLimit() * self.getTickTime();
    +    }
    +
    +    @Override
    +    public int getTickOfNextAckDeadline() {
    +        return self.tick.get() + self.syncLimit;
    +    }
    +
    +    @Override
    +    public int getTickOfInitialAckDeadline() {
    +        return self.tick.get() + self.initLimit + self.syncLimit;
    +    }
    +
    +    @Override
    +    public long getAndDecrementFollowerCounter() {
    +        return followerCounter.getAndDecrement();
    +    }
    +
    +    @Override
    +    public void waitForEpochAck(long sid, StateSummary ss) throws IOException, InterruptedException
{
    +        // since this is done by an active follower, we don't need to wait for anything
    +    }
    +
    +    @Override
    +    public LearnerSnapshotThrottler getLearnerSnapshotThrottler() {
    +        return learnerSnapshotThrottler;
    +    }
    +
    +    @Override
    +    public void waitForStartup() throws InterruptedException {
    +        // since this is done by an active follower, we don't need to wait for anything
    +    }
    +
    +    @Override
    +    synchronized public long getLastProposed() {
    +        return lastProposedZxid;
    +    }
    +
    +    @Override
    +    public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException,
IOException {
    +        return self.getCurrentEpoch();
    +    }
    +
    +    @Override
    +    public ZKDatabase getZKDatabase() {
    +        return zks.getZKDatabase();
    +    }
    +
    +    @Override
    +    public void waitForNewLeaderAck(long sid, long zxid) throws InterruptedException
{
    +        // no need to wait since we are a follower
    +    }
    +
    +    @Override
    +    public int getCurrentTick() {
    +        return self.tick.get();
    +    }
    +
    +    @Override
    +    public void processAck(long sid, long zxid, SocketAddress localSocketAddress) {
    +        if ((zxid & 0xffffffffL) == 0) {
    +            /*
    +             * We no longer process NEWLEADER ack by this method. However,
    +             * the learner sends ack back to the leader after it gets UPTODATE
    +             * so we just ignore the message.
    +             */
    +            return;
    +        }
    +
    +        throw new RuntimeException("Observers shouldn't send ACKS ack = " + Long.toHexString(zxid));
    +    }
    +
    +    @Override
    +    public void touch(long sess, int to) {
    +        zks.getSessionTracker().touchSession(sess, to);
    +    }
    +
    +    boolean revalidateLearnerSession(QuorumPacket qp) throws IOException {
    +        ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
    +        DataInputStream dis = new DataInputStream(bis);
    +        long id = dis.readLong();
    +        boolean valid = dis.readBoolean();
    +        Iterator<Revalidation> itr = pendingRevalidations.iterator();
    +        if (!itr.hasNext()) {
    +            // not a learner session, handle locally
    +            return false;
    +        }
    +        Revalidation revalidation = itr.next();
    +        if (revalidation.sessionId != id) {
    +            // not a learner session, handle locally
    +            return false;
    +        }
    +        itr.remove();
    +        LearnerHandler learnerHandler = revalidation.handler;
    +        // create a copy here as the qp object is reused by the Follower and may be mutated
    +        QuorumPacket deepCopy = new QuorumPacket(qp.getType(), qp.getZxid(),
    +                Arrays.copyOf(qp.getData(), qp.getData().length),
    +                qp.getAuthinfo() == null ? null : new ArrayList<>(qp.getAuthinfo()));
    +        learnerHandler.queuePacket(deepCopy);
    +        // To keep consistent as leader, touch the session when it's
    +        // revalidating the session, only update if it's a valid session.
    +        if (valid) {
    +            touch(revalidation.sessionId, revalidation.timeout);
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public void revalidateSession(QuorumPacket qp, LearnerHandler learnerHandler) throws
IOException {
    +        ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
    +        DataInputStream dis = new DataInputStream(bis);
    +        long id = dis.readLong();
    +        int to = dis.readInt();
    +        synchronized (revalidateSessionLock) {
    +            pendingRevalidations.add(new Revalidation(id, to, learnerHandler));
    +            Learner learner = zks.getLearner();
    +            if (learner != null) {
    +                learner.writePacket(qp, true);
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void submitLearnerRequest(Request si) {
    +        zks.processObserverRequest(si);
    +    }
    +
    +    @Override
    +    synchronized public long startForwarding(LearnerHandler learnerHandler, long lastSeenZxid)
{
    +        Iterator<QuorumPacket> itr = committedPkts.iterator();
    +        if (itr.hasNext()) {
    +            QuorumPacket packet = itr.next();
    +            if (packet.getZxid() > lastSeenZxid + 1) {
    +                LOG.error("LearnerHandler is too far behind ({} < {}), disconnecting
{} at {}", Long.toHexString(lastSeenZxid + 1),
    +                        Long.toHexString(packet.getZxid()), learnerHandler.getSid(),
learnerHandler.getRemoteAddress());
    +                learnerHandler.shutdown();
    +                return -1;
    +            } else if (packet.getZxid() == lastSeenZxid + 1) {
    +                learnerHandler.queuePacket(packet);
    +            }
    +            long queueHeadZxid = packet.getZxid();
    +            long queueBytesUsed = LearnerHandler.packetSize(packet);
    +            while (itr.hasNext()) {
    +                packet = itr.next();
    +                if (packet.getZxid() <= lastSeenZxid) {
    +                    continue;
    +                }
    +                learnerHandler.queuePacket(packet);
    +                queueBytesUsed += LearnerHandler.packetSize(packet);
    +            }
    +            LOG.info("finished syncing observer from retained commit queue: sid {}, "
+
    +                            "queue head 0x{}, queue tail 0x{}, sync position 0x{}, num
packets used {}, " +
    +                            "num bytes used {}",
    +                    learnerHandler.getSid(),
    +                    Long.toHexString(queueHeadZxid),
    +                    Long.toHexString(packet.getZxid()),
    +                    Long.toHexString(lastSeenZxid),
    +                    packet.getZxid() - lastSeenZxid,
    +                    queueBytesUsed);
    +        }
    +        activeObservers.add(learnerHandler);
    +        return lastProposedZxid;
    +    }
    +
    +    @Override
    +    public long getQuorumVerifierVersion() {
    +        return self.getQuorumVerifier().getVersion();
    +    }
    +
    +    @Override
    +    public String getPeerInfo(long sid) {
    +        QuorumPeer.QuorumServer server = self.getView().get(sid);
    +        return server == null ? "" : server.toString();
    +    }
    +
    +    @Override
    +    public byte[] getQuorumVerifierBytes() {
    +        return self.getLastSeenQuorumVerifier().toString().getBytes();
    +    }
    +
    +    @Override
    +    public QuorumAuthServer getQuorumAuthServer() {
    +        return (self == null) ? null : self.authServer;
    +    }
    +
    +    void proposalReceived(QuorumPacket qp) {
    +        proposedPkts.add(new QuorumPacket(Leader.INFORM, qp.getZxid(), qp.getData(),
null));
    +    }
    +
    +    private synchronized QuorumPacket removeProposedPacket(long zxid) {
    +        QuorumPacket pkt = proposedPkts.peek();
    +        if (pkt == null || pkt.getZxid() > zxid) {
    +            LOG.debug("ignore missing proposal packet for {}", Long.toHexString(zxid));
    +            return null;
    +        }
    +        if (pkt.getZxid() != zxid) {
    +            final String m = String.format("Unexpected proposal packet on commit ack,
expected zxid 0x%d got zxid 0x%d",
    +                    zxid, pkt.getZxid());
    +            LOG.error(m);
    +            throw new RuntimeException(m);
    +        }
    +        proposedPkts.remove();
    +        return pkt;
    +    }
    +
    +    private synchronized void cacheCommittedPacket(final QuorumPacket pkt) {
    +        committedPkts.add(pkt);
    +        pktsSize += LearnerHandler.packetSize(pkt);
    +        // remove 5 packets for every one added as we near the size limit
    +        for (int i = 0; pktsSize > pktsSizeLimit * 0.8  && i < 5; i++)
{
    +            QuorumPacket oldPkt = committedPkts.poll();
    +            if (oldPkt == null) {
    +                pktsSize = 0;
    +                break;
    +            }
    +            pktsSize -= LearnerHandler.packetSize(oldPkt);
    +        }
    +        // enforce the size limit as a hard cap
    +        while (pktsSize > pktsSizeLimit) {
    +            QuorumPacket oldPkt = committedPkts.poll();
    +            if (oldPkt == null) {
    +                pktsSize = 0;
    +                break;
    +            }
    +            pktsSize -= LearnerHandler.packetSize(oldPkt);
    +        }
    +    }
    +
    +    private synchronized void sendPacket(final QuorumPacket pkt) {
    +        for (LearnerHandler lh: activeObservers) {
    +            lh.queuePacket(pkt);
    +        }
    +        lastProposedZxid = pkt.getZxid();
    +    }
    +
    +    synchronized void proposalCommitted(long zxid) {
    +        QuorumPacket pkt = removeProposedPacket(zxid);
    +        if (pkt == null) {
    +            return;
    +        }
    +        cacheCommittedPacket(pkt);
    +        sendPacket(pkt);
    +    }
    +
    +    synchronized void informAndActivate(long zxid, long suggestedLeaderId) {
    +        QuorumPacket pkt = removeProposedPacket(zxid);
    +        if (pkt == null) {
    +            return;
    +        }
    +
    +        // Build the INFORMANDACTIVATE packet
    +        QuorumPacket informAndActivateQP = Leader.buildInformAndActivePacket(
    +                zxid, suggestedLeaderId, pkt.getData());
    +        cacheCommittedPacket(informAndActivateQP);
    +        sendPacket(informAndActivateQP);
    +    }
    +
    +    synchronized public void start() throws IOException {
    +        if (thread != null && thread.isAlive()) {
    +            return;
    +        }
    +        listenerRunning = true;
    +        if (self.getQuorumListenOnAllIPs()) {
    +            ss = new ServerSocket(port, 10 /* dog science */);
    +        } else {
    +            ss = new ServerSocket(port, 10 /* dog science */, self.getQuorumAddress().getAddress());
    +        }
    +        thread = new Thread(this, "ObserverMaster");
    +        thread.start();
    +        pinger = Executors.newSingleThreadScheduledExecutor();
    +        pinger.scheduleAtFixedRate(ping, self.tickTime /2, self.tickTime/2, TimeUnit.MILLISECONDS);
    +    }
    +
    +    public void run() {
    +        while (listenerRunning) {
    +            try {
    +                Socket s = ss.accept();
    --- End diff --
    
    One more thing, we added a recent change here to setSoTimeout to initLimit * tickTime
to align with what leader is doing.
    
    We should add it here as well.


---

Mime
View raw message