zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ph...@apache.org
Subject svn commit: r1759907 - in /zookeeper/trunk: CHANGES.txt src/java/main/org/apache/zookeeper/server/quorum/Learner.java src/java/test/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
Date Thu, 08 Sep 2016 20:59:36 GMT
Author: phunt
Date: Thu Sep  8 20:59:36 2016
New Revision: 1759907

URL: http://svn.apache.org/viewvc?rev=1759907&view=rev
Log:
ZOOKEEPER-2172: Cluster crashes when reconfig a new node as a participant (Arshad Mohammad
via phunt)

Added:
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1759907&r1=1759906&r2=1759907&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Thu Sep  8 20:59:36 2016
@@ -377,6 +377,9 @@ BUGFIXES:
   "config -c" when client port is mentioned as separate and not like
   new style (Rakesh Kumar Singh via phunt)
 
+  ZOOKEEPER-2172: Cluster crashes when reconfig a new node as a participant
+  (Arshad Mohammad via phunt)
+
 IMPROVEMENTS:
   ZOOKEEPER-2024 Major throughput improvement with mixed workloads (Kfir Lev-Ari via shralex)
 

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1759907&r1=1759906&r2=1759907&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Thu Sep
 8 20:59:36 2016
@@ -431,19 +431,19 @@ public class Learner {
                     break;
                 case Leader.COMMIT:
                 case Leader.COMMITANDACTIVATE:
+                    pif = packetsNotCommitted.peekFirst();
+                    if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE)
{
+                        QuorumVerifier qv = self.configFromString(new String(((SetDataTxn)
pif.rec).getData()));
+                        boolean majorChange = self.processReconfig(qv, ByteBuffer.wrap(qp.getData()).getLong(),
+                                qp.getZxid(), true);
+                        if (majorChange) {
+                            throw new Exception("changes proposed in reconfig");
+                        }
+                    }
                     if (!snapshotTaken) {
-                        pif = packetsNotCommitted.peekFirst();
                         if (pif.hdr.getZxid() != qp.getZxid()) {
                             LOG.warn("Committing " + qp.getZxid() + ", but next proposal
is " + pif.hdr.getZxid());
                         } else {
-                           if (qp.getType() == Leader.COMMITANDACTIVATE) {
-                               QuorumVerifier qv = self.configFromString(new String(((SetDataTxn)pif.rec).getData()));
-                               boolean majorChange =
-                                       self.processReconfig(qv, ByteBuffer.wrap(qp.getData()).getLong(),
qp.getZxid(), true);
-                                if (majorChange) {
-                                   throw new Exception("changes proposed in reconfig");
-                                }
-                           }
                             zk.processTxn(pif.hdr, pif.rec);
                             packetsNotCommitted.remove();
                         }

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java?rev=1759907&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
(added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
Thu Sep  8 20:59:36 2016
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.admin.AdminServer.AdminServerException;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReconfigDuringLeaderSyncTest extends QuorumPeerTestBase {
+    protected static final Logger LOG = LoggerFactory.getLogger(ReconfigDuringLeaderSyncTest.class);
+    private static int SERVER_COUNT = 3;
+    private MainThread[] mt;
+
+    /**
+     * <pre>
+     * Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2172.
+     * Cluster crashes when reconfig a new node as a participant.
+     * </pre>
+     *
+     * This issue occurs when reconfig's PROPOSAL and COMMITANDACTIVATE come in
+     * between the snapshot and the UPTODATE. In this case processReconfig was
+     * not invoked on the newly added node, and zoo.cfg.dynamic.next wasn't
+     * deleted.
+     */
+
+    @Test
+    public void testDuringLeaderSync() throws Exception {
+        final int clientPorts[] = new int[SERVER_COUNT + 1];
+        StringBuilder sb = new StringBuilder();
+        String[] serverConfig = new String[SERVER_COUNT + 1];
+
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            clientPorts[i] = PortAssignment.unique();
+            serverConfig[i] = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":"
+ PortAssignment.unique()
+                    + ":participant;127.0.0.1:" + clientPorts[i];
+            sb.append(serverConfig[i] + "\n");
+        }
+        String currentQuorumCfgSection = sb.toString();
+        mt = new MainThread[SERVER_COUNT + 1];
+
+        // start 3 servers
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false);
+            mt[i].start();
+        }
+
+        // ensure all servers started
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            Assert.assertTrue("waiting for server " + i + " being up",
+                    ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT));
+        }
+        CountdownWatcher watch = new CountdownWatcher();
+        ZooKeeper preReconfigClient = new ZooKeeper("127.0.0.1:" + clientPorts[0], ClientBase.CONNECTION_TIMEOUT,
+                watch);
+        watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+
+        // new server joining
+        int joinerId = SERVER_COUNT;
+        clientPorts[joinerId] = PortAssignment.unique();
+        serverConfig[joinerId] = "server." + joinerId + "=127.0.0.1:" + PortAssignment.unique()
+ ":"
+                + PortAssignment.unique() + ":participant;127.0.0.1:" + clientPorts[joinerId];
+
+        // Find leader id.
+        int leaderId = -1;
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            if (mt[i].main.quorumPeer.leader != null) {
+                leaderId = i;
+                break;
+            }
+        }
+        assertFalse(leaderId == -1);
+
+        // Joiner initial config consists of itself and the leader.
+        sb = new StringBuilder();
+        sb.append(serverConfig[leaderId] + "\n").append(serverConfig[joinerId] + "\n");
+
+        /**
+         * This server will delay the response to a NEWLEADER message, and run
+         * reconfig command so that message at this processed in bellow order
+         *
+         * <pre>
+         * NEWLEADER
+         * reconfig's PROPOSAL
+         * reconfig's COMMITANDACTIVATE
+         * UPTODATE
+         * </pre>
+         */
+        mt[joinerId] = new MainThread(joinerId, clientPorts[joinerId], sb.toString(), false)
{
+            @Override
+            public TestQPMain getTestQPMain() {
+                return new MockTestQPMain();
+            }
+        };
+        mt[joinerId].start();
+        CustomQuorumPeer qp = getCustomQuorumPeer(mt[joinerId]);
+
+        // delete any already existing .next file
+        String nextDynamicConfigFilename = qp.getNextDynamicConfigFilename();
+        File nextDynaFile = new File(nextDynamicConfigFilename);
+        nextDynaFile.delete();
+
+        // call reconfig API when the new server has received
+        // Leader.NEWLEADER
+        while (true) {
+            if (qp.isNewLeaderMessage()) {
+                preReconfigClient.reconfig(serverConfig[joinerId], null, null, -1, null,
null);
+                break;
+            } else {
+                // sleep for 10 millisecond and then again check
+                Thread.sleep(10);
+            }
+        }
+        watch = new CountdownWatcher();
+        ZooKeeper postReconfigClient = new ZooKeeper("127.0.0.1:" + clientPorts[joinerId],
+                ClientBase.CONNECTION_TIMEOUT, watch);
+        watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+        // do one successful operation on the newly added node
+        postReconfigClient.create("/reconfigIssue", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+        assertFalse("zoo.cfg.dynamic.next is not deleted.", nextDynaFile.exists());
+
+        // verify that joiner has up-to-date config, including all four servers.
+        for (long j = 0; j <= SERVER_COUNT; j++) {
+            assertNotNull("server " + j + " is not present in the new quorum",
+                    qp.getQuorumVerifier().getVotingMembers().get(j));
+        }
+
+        // close clients
+        preReconfigClient.close();
+        postReconfigClient.close();
+    }
+
+    private static CustomQuorumPeer getCustomQuorumPeer(MainThread mt) {
+        while (true) {
+            QuorumPeer quorumPeer = mt.getQuorumPeer();
+            if (null != quorumPeer) {
+                return (CustomQuorumPeer) quorumPeer;
+            } else {
+                try {
+                    Thread.sleep(10);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    @After
+    public void tearDown() {
+        // stop all severs
+        if (null != mt) {
+            for (int i = 0; i < mt.length; i++) {
+                try {
+                    mt[i].shutdown();
+                } catch (InterruptedException e) {
+                    LOG.warn("Quorum Peer interrupted while shutting it down", e);
+                }
+            }
+        }
+    }
+
+    private static class CustomQuorumPeer extends QuorumPeer {
+        private boolean newLeaderMessage = false;
+
+        public CustomQuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir,
File logDir, int clientPort,
+                int electionAlg, long myid, int tickTime, int initLimit, int syncLimit) throws
IOException {
+            super(quorumPeers, snapDir, logDir, electionAlg, myid, tickTime, initLimit, syncLimit,
false,
+                    ServerCnxnFactory.createFactory(new InetSocketAddress(clientPort), -1),
new QuorumMaj(quorumPeers));
+        }
+
+        /**
+         * If true, after 100 millisecond NEWLEADER response is send to leader
+         *
+         * @return
+         */
+        public boolean isNewLeaderMessage() {
+            return newLeaderMessage;
+        }
+
+        @Override
+        protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
+
+            return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb()))
{
+
+                @Override
+                void writePacket(QuorumPacket pp, boolean flush) throws IOException {
+                    if (pp != null && pp.getType() == Leader.ACK) {
+                        newLeaderMessage = true;
+                        try {
+                            /**
+                             * Delaying the ACK message, a follower sends as
+                             * response to a NEWLEADER message, so that the
+                             * leader has a chance to send the reconfig and only
+                             * then the UPTODATE message.
+                             */
+                            Thread.sleep(100);
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                    super.writePacket(pp, flush);
+                }
+            };
+        }
+    }
+
+    private static class MockTestQPMain extends TestQPMain {
+        @Override
+        public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException
{
+            quorumPeer = new CustomQuorumPeer(config.getQuorumVerifier().getAllMembers(),
config.getDataDir(),
+                    config.getDataLogDir(), config.getClientPortAddress().getPort(), config.getElectionAlg(),
+                    config.getServerId(), config.getTickTime(), config.getInitLimit(), config.getSyncLimit());
+            quorumPeer.start();
+            try {
+                quorumPeer.join();
+            } catch (InterruptedException e) {
+                LOG.warn("Quorum Peer interrupted", e);
+            }
+        }
+    }
+}



Mime
View raw message