hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Hadoop Wiki] Update of "ZooKeeper/Observers/ReviewGuide" by HenryRobinson
Date Wed, 11 Nov 2009 01:51:10 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Hadoop Wiki" for change notification.

The "ZooKeeper/Observers/ReviewGuide" page has been changed by HenryRobinson.
http://wiki.apache.org/hadoop/ZooKeeper/Observers/ReviewGuide

--------------------------------------------------

New page:
= A Short Guide to Reviewing ZOOKEEPER-368: =
= Configuration =
== QuorumPeerConfig.java ==
Configuration changes are mainly in QuorumPeerConfig.java:

{{{
+    protected final HashMap<Long,QuorumServer> observers =
+        new HashMap<Long, QuorumServer>();
}}}
We must keep track of which servers are Followers and which are Observers so that quorums
can be constructed only from voting members. This HashMap mimics the servers HashMap, and
is combined with servers after quorum construction.

{{{
+    protected LearnerType peerType = LearnerType.PARTICIPANT;
}}}
Keep track of the type of the QuorumServer (peerType can be renamed to learnerType).

{{{
+            } else if (key.equals("peerType")) {
+                if (value.toLowerCase().equals("observer")) {
+                    peerType = LearnerType.OBSERVER;
+                } else if (value.toLowerCase().equals("participant")) {
+                    peerType = LearnerType.PARTICIPANT;
+                } else
+                {
+                    throw new ConfigException("Unrecognised peertype: " + value);
+                }
}}}
peerType=observer is the way to configure a node as an Observer. This code pulls this in.

{{{
+                if ((parts.length != 2) && (parts.length != 3) && (parts.length
!=4)) {
                     LOG.error(value
-                       + " does not have the form host:port or host:port:port");
+                       + " does not have the form host:port or host:port:port " +
+                       " or host:port:port:type");
}}}
We must also add a new form for the server specification to identify Observers so that they
can be excluded from quorum construction.

{{{
+                } else if (parts.length == 4) {
+                    InetSocketAddress electionAddr = new InetSocketAddress(
+                            parts[0], Integer.parseInt(parts[2]));
+                    LearnerType type = LearnerType.PARTICIPANT;
+                    if (parts[3].toLowerCase().equals("observer")) {
+                        type = LearnerType.OBSERVER;
+                        observers.put(Long.valueOf(sid), new QuorumServer(sid, addr,
+                                electionAddr,type));
+                    } else if (parts[3].toLowerCase().equals("participant")) {
+                        type = LearnerType.PARTICIPANT;
+                        servers.put(Long.valueOf(sid), new QuorumServer(sid, addr,
+                                electionAddr,type));
+                    } else {
+                        throw new ConfigException("Unrecognised peertype: " + value);
+                    }
}}}
Note that if :observer is specified, the QuorumServer goes into the observers HashMap, otherwise
it goes into servers.

{{{
+        if (observers.size() > 0 && electionAlg != 0) {
+            throw new IllegalArgumentException("Observers must currently be used with simple
leader election" +
+                       " (set electionAlg=0)");
+        }
}}}
We can't use Observers with election algorithms other than 0 at the moment. This will change
fairly shortly.

= Operation =
== QuorumPeer.java ==
{{{
+    public enum LearnerType {
+        PARTICIPANT, OBSERVER;
+    }
+    private LearnerType peerType = LearnerType.PARTICIPANT;
+
+    public LearnerType getPeerType() {
+        return peerType;
+    }
+
+    public void setPeerType(LearnerType p) {
+        peerType = p;
+    }
}}}
Although ServerState captures the dynamic state of a node, it is hard to know whether a node
that is LOOKING should move to OBSERVING or FOLLOWING once it has found the Leader. This type
allows us to record that state, so that the state transition is obvious.

{{{
+    protected Map<Long, QuorumServer> quorumPeers;
}}}
quorumPeers should be protected.

{{{
+
+    protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
+        return new Observer(this, new ObserverZooKeeperServer(logFactory,
+                this, new ZooKeeperServer.BasicDataTreeBuilder()));
+    }
}}}
Following the same pattern as Follower and Leader, makeObserver constructs a new Observer.

{{{
+                        case OBSERVING:
+                            // Do nothing, Observers keep themselves to
+                            // themselves.
+                            break;
}}}
Explicitly make sure that Observers do not respond to leader inquiries - this is the simplest
way to ensure that their votes aren't counted (the alternative is to have the client know
if they are an Observer. In fact, the client does not query Observers, but it's good to be
defensive here.)

{{{
+                       new QuorumMaj(countParticipants(quorumPeers)));
}}}
Before this was quorumPeers.size(). However, we now must distinguish Observers and Followers.

{{{
+        if (getPeerType() == LearnerType.PARTICIPANT) {
+            startLeaderElection();
+        }
}}}
Observers should not instigate the LE process. Instead, they wait for Leaders to be elected
and then attempt to get a majority of votes.

{{{
+        for (QuorumServer p : getView().values()) {
}}}
accesses to quorumPeers should be replaced by getView or getVotingView calls now.

{{{
+    /**
+     * Count the number of nodes in the map that could be followers.
+     * @param peers
+     * @return The number of followers in the map
+     */
+    protected static int countParticipants(Map<Long,QuorumServer> peers) {
+      int count = 0;
+      for (QuorumServer q : peers.values()) {
+          if (q.type == LearnerType.PARTICIPANT) {
+              count++;
+          }
+      }
+      return count;
+    }
}}}
Not synchronized on peers; probably should be once the dynamic ensembles patch rolls around,
but for the time being all updates to peers are at start-up time.

{{{
+                case OBSERVING:
+                    try {
+                        LOG.info("OBSERVING");
+                        setObserver(makeObserver(logFactory));
+                        observer.observeLeader();
+                    } catch (Exception e) {
+                        LOG.warn("Unexpected exception",e );
+                    } finally {
+                        observer.shutdown();
+                        setObserver(null);
+                        setPeerState(ServerState.LOOKING);
+                    }
+                    break;
}}}
This is the Observers version of the main driver loop; much the same as for followers.

{{{
+     * A 'view' is a node's current opinion of the membership of the entire
+     * ensemble.
      */
     public Map<Long,QuorumPeer.QuorumServer> getView() {
-        return this.quorumPeers;
+        return Collections.unmodifiableMap(this.quorumPeers);
+    }
+
+    /**
+     * Observers are not contained in this view, only nodes with
+     * PeerType=PARTICIPANT.
+     */
+    public Map<Long,QuorumPeer.QuorumServer> getVotingView() {
+        Map<Long,QuorumPeer.QuorumServer> ret =
+            new HashMap<Long, QuorumPeer.QuorumServer>();
+        Map<Long,QuorumPeer.QuorumServer> view = getView();
+        for (QuorumServer server : view.values()) {
+            if (server.type == LearnerType.PARTICIPANT) {
+                ret.put(server.id, server);
+            }
+        }
+        return ret;
+    }
+
+    /**
+     * Returns only observers, no followers.
+     */
+    public Map<Long,QuorumPeer.QuorumServer> getObservingView() {
+        Map<Long,QuorumPeer.QuorumServer> ret =
+            new HashMap<Long, QuorumPeer.QuorumServer>();
+        Map<Long,QuorumPeer.QuorumServer> view = getView();
+        for (QuorumServer server : view.values()) {
+            if (server.type == LearnerType.OBSERVER) {
+                ret.put(server.id, server);
+            }
+        }
+        return ret;
}}}
Three functions to get various varieties of 'view' of the current cluster. Note the unmodifiable
map constructor - getView is read-only.

{{{
+    /**
+     * Only used by QuorumStats at the moment
+     */
}}}
This is a strangely named function (getQuorumPeers) that returns a String representation of
the total view; this comment added to show that it is not really part of the core QuorumPeer
API.

== Leader.java ==
{{{
+    protected HashSet<LearnerHandler> observingLearners = new HashSet<LearnerHandler>();
}}}
Although LearnerHandlers can currently speak to both Followers and Observers, this may not
always be the case. It is also necessary to keep Observers separate so that they only get
INFORMed, not PROPOSAL messages.

{{{
+     * This tells the leader that the connecting peer is actually an observer
+     */
+    final static int OBSERVERINFO = 16;
+
}}}
This is how the Leader distinguishes Observers from Followers.

{{{
+    /**
+     * This message type informs observers of a committed proposal.
+     */
+    final static int INFORM = 8;
}}}
This new message type is the key to the Observer protocol. INFORM messages are much the same
as PROPOSAL messages; but the semantic is that the proposal attached has already been agreed.
INFORMs are sent after a quorum of ACKs are received for a given PROPOSAL. At this point the
PROPOSAL is guaranteed eventually to commit to a quorum of peers so it is safe for the Observer
to receive this message.

{{{
+                inform(p);

+    /**
+     * Create an inform packet and send it to all observers.
+     * @param zxid
+     * @param proposal
+     */
+    public void inform(Proposal proposal) {
+        QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid,
+                                            proposal.packet.getData(), null);
+        sendObserverPacket(qp);
+    }
}}}
Constructs an INFORM packet and sends it to all Observers reliably.

{{{
+    /**
+     * send a packet to all observers
+     */
+    void sendObserverPacket(QuorumPacket qp) {
+        synchronized(observingLearners) {
+            for (LearnerHandler f : observingLearners) {
+                f.queuePacket(qp);
+            }
+        }
+    }
+
}}}
This is where the separate data structure for Observer LearnerHandlers pays off. This code
might be getting called very frequently, so doing the test-and-branch here to find all Observers
might take too long.

== LeaderElection.java ==
{{{
+                        /**
+                         * We want to make sure we implement the state machine
+                         * correctly. If we are a PARTICIPANT, once a leader
+                         * is elected we can move either to LEADING or
+                         * FOLLOWING. However if we are an OBSERVER, it is an
+                         * error to be elected as a Leader.
+                         */
+                        if (self.getPeerType() == LearnerType.OBSERVER) {
+                            if (current.id == self.getId()) {
+                                // This should never happen!
+                                LOG.error("OBSERVER elected as leader!");
+                                Thread.sleep(100);
+                            }
+                            else {
+                                self.setPeerState(ServerState.OBSERVING);
+                                Thread.sleep(100);
+                                return current;
+                            }
+                        } else {
+                            self.setPeerState((current.id == self.getId())
+                                    ? ServerState.LEADING: ServerState.FOLLOWING);
+                            if (self.getPeerState() == ServerState.FOLLOWING) {
+                                Thread.sleep(100);
+                            }
+                            return current;
}}}
As the comment says, we want to move from LOOKING to OBSERVING correctly. If, somehow, an
Observer is elected as a Leader, it will sleep (in the hope that this encourages the ensemble
to vote someone else in).

== AuthFastLeaderElection.java / FastLeaderElection.java ==
Only changes to use the getView APIs

== QuorumCnxManager.java ==

Only changes to use the getView APIs

== LearnerHandler.java ==
{{{
+    private LearnerType  learnerType = LearnerType.PARTICIPANT;
+    public LearnerType getLearnerType() {
+        return learnerType;
+    }
}}}
Used by Leader to distinguish the type of Learner being handled.

{{{
+                    if (this.learnerType == LearnerType.OBSERVER) {
+                        LOG.error("Received ACK from Observer  " + this.sid);
+                    }
}}}
Defensively error when we receive an ACK from an Observer. This could possibly be removed,
especially if it hurts throughput. Also, Observers will send an ACK at the Leader-sync stage
of the join protocol, which causes this error message to be written. This should probably
be removed in the future.

== ObserverRequestProcessor.java (new file) ==
This simple request processor deals with forwarding requests to an Observer object. In the
future, this might be customized to alter behaviour to do things like batching.

== Observer.java (new file) ==
Observer extends Learner.

{{{
+    void observeLeader() throws InterruptedException {
+        zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
+
+        try {
+            InetSocketAddress addr = findLeader();
+            LOG.info("Observing " + addr);
+            try {
+                connectToLeader(addr);
+                long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
+
+                syncWithLeader(newLeaderZxid);
+                QuorumPacket qp = new QuorumPacket();
+                while (self.running) {
+                    readPacket(qp);
+                    processPacket(qp);
+                }
+            } catch (IOException e) {
+                LOG.warn("Exception when observing the leader", e);
+                try {
+                    sock.close();
+                } catch (IOException e1) {
+                    e1.printStackTrace();
+                }
+
+                synchronized (pendingRevalidations) {
+                    // clear pending revalidations
+                    pendingRevalidations.clear();
+                    pendingRevalidations.notifyAll();
+                }
+            }
+        } finally {
+            zk.unregisterJMX(this);
+        }
+    }
}}}
Simple driver loop for Observers - read a packet and then process it. Deal with pending session
revalidations in the same way that Followers do.

{{{
+    protected void processPacket(QuorumPacket qp) throws IOException{
+        switch (qp.getType()) {
+        case Leader.PING:
+            ping(qp);
+            break;
+        case Leader.PROPOSAL:
+            LOG.warn("Ignoring proposal");
+            break;
+        case Leader.COMMIT:
+            LOG.warn("Ignoring commit");
+            break;
+        case Leader.UPTODATE:
+            zk.takeSnapshot();
+            self.cnxnFactory.setZooKeeperServer(zk);
+            break;
+        case Leader.REVALIDATE:
+            revalidate(qp);
+            break;
+        case Leader.SYNC:
+            ((ObserverZooKeeperServer)zk).sync();
+            break;
+        case Leader.INFORM:
+            TxnHeader hdr = new TxnHeader();
+            BinaryInputArchive ia = BinaryInputArchive
+                    .getArchive(new ByteArrayInputStream(qp.getData()));
+            Record txn = SerializeUtils.deserializeTxn(ia, hdr);
+            Request request = new Request (null, hdr.getClientId(),
+                                           hdr.getCxid(),
+                                           hdr.getType(), null, null);
+            request.txn = txn;
+            request.hdr = hdr;
+            ObserverZooKeeperServer obs = (ObserverZooKeeperServer)zk;
+            obs.commitRequest(request);
+            break;
+        }
+    }
}}}
This code describes how the Observer reacts to certain packet types. PROPOSAL and COMMIT are
ignored, and should never be received (should this then be a LOG.error?). The most interesting
case is INFORM, which unpacks the committed proposal and tells the ObserverZooKeeperServer
to commit it. (Note the downcast - this could be optimized away).

== ObserverZooKeeperServer.java (new file) ==
ObserverZooKeeperServer extends LearnerZooKeeperServer.

A lot of code is shared between this class and FollowerZooKeeperServer, but the class hierarchy
does not provide a good place to share the code.

{{{
+    private CommitProcessor commitProcessor;
+    private SyncRequestProcessor syncProcessor;
}}}
The RequestProcessor pipeline is simpler than for a Follower - CommitProcessor handles commits
as a result of INFORM messages, and syncProcessor handles synchronisation requests from the
client.

= Testing =
== ObserverTest.java ==
This contains tests to make sure that Observers are behaving correctly.

testObserver ensures that commands get forwarded correctly through an Observer to an ensemble,
and that a quorum is lost even though a voting Observer would make the ensemble quorate (i.e.,
with 2 Followers and one Observer, and 1 Follower fails).

testSingleObserver makes sure that a singleton Observer in an ensemble can't come up on its
own.

testLeaderElectionFail ensures that a cluster with an electionType != 0 won't come up if it
contains Observers.

== QuorumBase.java ==
Various changes here to add options to start a cluster with two Observers.

== AsyncHammerTest.java ==
{{{
+    @Test
+    public void testObserversHammer() throws Exception {
+        qb.tearDown();
+        qb.setUp(true);
+        bang = true;
+        Thread[] hammers = new Thread[100];
+        for (int i = 0; i < hammers.length; i++) {
+            hammers[i] = new HammerThread("HammerThread-" + i);
+            hammers[i].start();
+        }
+        Thread.sleep(5000); // allow the clients to run for max 5sec
+        bang = false;
+        for (int i = 0; i < hammers.length; i++) {
+            hammers[i].interrupt();
+            verifyThreadTerminated(hammers[i], 60000);
+        }
+        // before restart
+        qb.verifyRootOfAllServersMatch(qb.hostPort);
+    }
}}}
Mimics the AsyncHammerTest with Observers.

Other tests: should probably have sync hammer equivalent?

Mime
View raw message