zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nkal...@apache.org
Subject [zookeeper] branch branch-3.5 updated: Revert "ZOOKEEPER-3911: Data inconsistency caused by DIFF sync uncommitted log"
Date Fri, 13 Nov 2020 17:39:03 GMT
This is an automated email from the ASF dual-hosted git repository.

nkalmar pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 049ac33  Revert "ZOOKEEPER-3911: Data inconsistency caused by DIFF sync uncommitted
log"
049ac33 is described below

commit 049ac3322a1c0028b1bbd67020302537a56b7476
Author: Norbert Kalmar <nkalmar@apache.org>
AuthorDate: Fri Nov 13 18:34:37 2020 +0100

    Revert "ZOOKEEPER-3911: Data inconsistency caused by DIFF sync uncommitted log"
    
    This reverts commit 31032cf9818559da3500607378525b631aae02ab.
---
 .../apache/zookeeper/server/ZooKeeperServer.java   |  16 +-
 .../apache/zookeeper/server/quorum/Learner.java    |  19 +-
 .../server/quorum/DIFFSyncConsistencyTest.java     | 294 ---------------------
 3 files changed, 4 insertions(+), 325 deletions(-)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 23591c2..4e4feaf 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -467,19 +467,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider
{
     }
 
     public synchronized void startup() {
-        startupWithServerState(State.RUNNING);
-    }
-
-    public synchronized void startupWithoutServing() {
-        startupWithServerState(State.INITIAL);
-    }
-
-    public synchronized void startServing() {
-        setState(State.RUNNING);
-        notifyAll();
-    }
-
-    private void startupWithServerState(State state) {
         if (sessionTracker == null) {
             createSessionTracker();
         }
@@ -488,8 +475,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider
{
 
         registerJMX();
 
-        setState(state);
-
+        setState(State.RUNNING);
         notifyAll();
     }
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 48d4962..2da5add 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -559,22 +559,8 @@ public class Learner {
                    }
                    
                     self.setCurrentEpoch(newEpoch);
-                    writeToTxnLog = true;
-                    //Anything after this needs to go to the transaction log, not applied
directly in memory
+                    writeToTxnLog = true; //Anything after this needs to go to the transaction
log, not applied directly in memory
                     isPreZAB1_0 = false;
-
-                    // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit
them (ACK NEWLEADER).
-                    sock.setSoTimeout(self.tickTime * self.syncLimit);
-                    self.setSyncMode(QuorumPeer.SyncMode.NONE);
-                    zk.startupWithoutServing();
-                    if (zk instanceof FollowerZooKeeperServer) {
-                        FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
-                        for (PacketInFlight p : packetsNotCommitted) {
-                            fzk.logRequest(p.hdr, p.rec, p.digest);
-                        }
-                        packetsNotCommitted.clear();
-                    }
-
                     writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null),
true);
                     break;
                 }
@@ -582,7 +568,8 @@ public class Learner {
         }
         ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
         writePacket(ack, true);
-        zk.startServing();
+        sock.setSoTimeout(self.tickTime * self.syncLimit);
+        zk.startup();
         /*
          * Update the election vote here to ensure that all members of the
          * ensemble report the same vote to new servers that start up and
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java
deleted file mode 100644
index 9b9ea55..0000000
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.util.Map;
-import javax.security.sasl.SaslException;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.PortAssignment;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooKeeper.States;
-import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.server.quorum.Leader.Proposal;
-import org.apache.zookeeper.test.ClientBase;
-import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-
-public class DIFFSyncConsistencyTest extends QuorumPeerTestBase {
-
-    private static int SERVER_COUNT = 3;
-    private MainThread[] mt = new MainThread[SERVER_COUNT];
-
-    @Test
-    @Timeout(value = 120)
-    public void testInconsistentDueToUncommittedLog() throws Exception {
-        final int LEADER_TIMEOUT_MS = 10_000;
-        final int[] clientPorts = new int[SERVER_COUNT];
-
-        StringBuilder sb = new StringBuilder();
-        String server;
-        for (int i = 0; i < SERVER_COUNT; i++) {
-            clientPorts[i] = PortAssignment.unique();
-            server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
-                    + ":participant;127.0.0.1:" + clientPorts[i];
-            sb.append(server + "\n");
-        }
-        String currentQuorumCfgSection = sb.toString();
-
-        for (int i = 0; i < SERVER_COUNT; i++) {
-            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) {
-                @Override
-                public TestQPMain getTestQPMain() {
-                    return new MockTestQPMain();
-                }
-            };
-            mt[i].start();
-        }
-
-        for (int i = 0; i < SERVER_COUNT; i++) {
-            assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT),
-                    "waiting for server " + i + " being up");
-        }
-
-        int leader = findLeader(mt);
-        CountdownWatcher watch = new CountdownWatcher();
-        ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[leader], ClientBase.CONNECTION_TIMEOUT,
watch);
-        watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
-
-        Map<Long, Proposal> outstanding = mt[leader].main.quorumPeer.leader.outstandingProposals;
-        // Increase the tick time to delay the leader going to looking to allow us proposal
a transaction while other
-        // followers are offline.
-        int previousTick = mt[leader].main.quorumPeer.tickTime;
-        mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS;
-        // Let the previous tick on the leader exhaust itself so the new tick time takes
effect
-        Thread.sleep(previousTick);
-
-        LOG.info("LEADER ELECTED {}", leader);
-
-        // Shutdown followers to make sure we don't accidentally send the proposal we are
going to make to follower.
-        // In other words, we want to make sure the followers get the proposal later through
DIFF sync.
-        for (int i = 0; i < SERVER_COUNT; i++) {
-            if (i != leader) {
-                mt[i].shutdown();
-            }
-        }
-
-        // Send a create request to old leader and make sure it's synced to disk.
-        try {
-            zk.create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            fail("create /zk" + leader + " should have failed");
-        } catch (KeeperException e) {
-        }
-
-        // Make sure that we actually did get it in process at the leader; there can be extra
sessionClose proposals.
-        assertTrue(outstanding.size() > 0);
-        Proposal p = findProposalOfType(outstanding, OpCode.create);
-        LOG.info("Old leader id: {}. All proposals: {}", leader, outstanding);
-        assertNotNull(p, "Old leader doesn't have 'create' proposal");
-
-        // Make sure leader sync the proposal to disk.
-        int sleepTime = 0;
-        Long longLeader = (long) leader;
-        while (!p.qvAcksetPairs.get(0).getAckset().contains(longLeader)) {
-            if (sleepTime > 2000) {
-                fail("Transaction not synced to disk within 1 second " + p.qvAcksetPairs.get(0).getAckset()
+ " expected " + leader);
-            }
-            Thread.sleep(100);
-            sleepTime += 100;
-        }
-
-        // Start controlled followers where we deliberately make the follower fail once follower
receive the UPTODATE
-        // message from leader. Because followers only persist proposals from DIFF sync after
UPTODATE, this can
-        // deterministically simulate the situation where followers ACK NEWLEADER (which
makes leader think she has the
-        // quorum support, but actually not afterwards) but immediately fail afterwards without
persisting the proposals
-        // from DIFF sync.
-        for (int i = 0; i < SERVER_COUNT; i++) {
-            if (i == leader) {
-                continue;
-            }
-
-            mt[i].start();
-            int sleepCount = 0;
-            while (mt[i].getQuorumPeer() == null) {
-                ++sleepCount;
-                if (sleepCount > 100) {
-                    fail("Can't start follower " + i + " !");
-                }
-                Thread.sleep(100);
-            }
-
-            ((CustomQuorumPeer) mt[i].getQuorumPeer()).setInjectError(true);
-            LOG.info("Follower {} started.", i);
-        }
-
-        // Verify leader can see it. The fact that leader can see it implies that
-        // leader should, at this point in time, get a quorum of ACK of NEWLEADER
-        // from two followers so leader can start serving requests; this also implies
-        // that DIFF sync from leader to followers are finished at this point in time.
-        // We then verify later that followers should have the same view after we shutdown
-        // this leader, otherwise it's a violation of ZAB / sequential consistency.
-        int c = 0;
-        while (c < 100) {
-            ++c;
-            try {
-                Stat stat = zk.exists("/zk" + leader, false);
-                assertNotNull(stat, "server " + leader + " should have /zk");
-                break;
-            } catch (KeeperException.ConnectionLossException e) {
-
-            }
-            Thread.sleep(100);
-        }
-
-        // Shutdown all servers
-        for (int i = 0; i < SERVER_COUNT; i++) {
-            mt[i].shutdown();
-        }
-        waitForOne(zk, States.CONNECTING);
-
-        // Now restart all servers except the old leader. Only old leader has the transaction
sync to disk.
-        // The old followers only had in memory view of the transaction, and they didn't
have a chance
-        // to sync to disk because we made them fail at UPTODATE.
-        for (int i = 0; i < SERVER_COUNT; i++) {
-            if (i == leader) {
-                continue;
-            }
-            mt[i].start();
-            int sleepCount = 0;
-            while (mt[i].getQuorumPeer() == null) {
-                ++sleepCount;
-                if (sleepCount > 100) {
-                    fail("Can't start follower " + i + " !");
-                }
-                Thread.sleep(100);
-            }
-
-            ((CustomQuorumPeer) mt[i].getQuorumPeer()).setInjectError(false);
-            LOG.info("Follower {} started again.", i);
-        }
-
-        int newLeader = findLeader(mt);
-        assertNotEquals(newLeader, leader, "new leader is still the old leader " + leader
+ " !!");
-
-        // This simulates the case where clients connected to the old leader had a view of
the data
-        // "/zkX", but clients connect to the new leader does not have the same view of data
(missing "/zkX").
-        // This inconsistent view of the quorum exposed from leaders is a violation of ZAB.
-        for (int i = 0; i < SERVER_COUNT; i++) {
-            if (i != newLeader) {
-                continue;
-            }
-            zk.close();
-            zk = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT,
watch);
-            watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
-            Stat val = zk.exists("/zk" + leader, false);
-            assertNotNull(val, "Data inconsistency detected! "
-                    + "Server " + i + " should have a view of /zk" + leader + "!");
-        }
-
-        zk.close();
-    }
-
-    @AfterEach
-    public void tearDown() {
-        for (int i = 0; i < mt.length; i++) {
-            try {
-                mt[i].shutdown();
-            } catch (InterruptedException e) {
-                LOG.warn("Quorum Peer interrupted while shutting it down", e);
-            }
-        }
-    }
-
-    static class CustomQuorumPeer extends QuorumPeer {
-
-        private volatile boolean injectError = false;
-
-        public CustomQuorumPeer() throws SaslException {
-
-        }
-
-        @Override
-        protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
-            return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb()))
{
-
-                @Override
-                void readPacket(QuorumPacket pp) throws IOException {
-                    /**
-                     * In real scenario got SocketTimeoutException while reading
-                     * the packet from leader because of network problem, but
-                     * here throwing SocketTimeoutException based on whether
-                     * error is injected or not
-                     */
-                    super.readPacket(pp);
-                    if (injectError && pp.getType() == Leader.UPTODATE) {
-                        String type = LearnerHandler.packetToString(pp);
-                        throw new SocketTimeoutException("Socket timeout while reading the
packet for operation "
-                                + type);
-                    }
-                }
-
-            };
-        }
-
-        public void setInjectError(boolean injectError) {
-            this.injectError = injectError;
-        }
-
-    }
-
-    static class MockTestQPMain extends TestQPMain {
-
-        @Override
-        protected QuorumPeer getQuorumPeer() throws SaslException {
-            return new CustomQuorumPeer();
-        }
-
-    }
-
-    private Proposal findProposalOfType(Map<Long, Proposal> proposals, int type) {
-        for (Proposal proposal : proposals.values()) {
-            if (proposal.request.getHdr().getType() == type) {
-                return proposal;
-            }
-        }
-        return null;
-    }
-
-    private int findLeader(MainThread[] mt) {
-        for (int i = 0; i < mt.length; i++) {
-            if (mt[i].main.quorumPeer.leader != null) {
-                return i;
-            }
-        }
-        return -1;
-    }
-}


Mime
View raw message