From commits-return-6157-archive-asf-public=cust-asf.ponee.io@zookeeper.apache.org Fri Feb 23 23:50:49 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 1D63C18067E for ; Fri, 23 Feb 2018 23:50:47 +0100 (CET) Received: (qmail 4361 invoked by uid 500); 23 Feb 2018 22:50:46 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 4350 invoked by uid 99); 23 Feb 2018 22:50:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Feb 2018 22:50:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C6B47EB4D5; Fri, 23 Feb 2018 22:50:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: afine@apache.org To: commits@zookeeper.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: zookeeper git commit: ZOOKEEPER-2845: Apply commit log when restarting server. Date: Fri, 23 Feb 2018 22:50:43 +0000 (UTC) Repository: zookeeper Updated Branches: refs/heads/master f0b67b6e4 -> 722ba9409 ZOOKEEPER-2845: Apply commit log when restarting server. I will be creating a patch/pull request for 3.4 and 3.5 too, but I wanted to get a pull request up for others to look at ASAP. I have a version of this based off of #310 at https://github.com/revans2/zookeeper/tree/ZOOKEEPER-2845-orig-test-patch but the test itself is flaky. Frequently leader election does not go as planned on the test and it ends up failing but not because it ended up in an inconsistent state. I am happy to answer any questions anyone has about the patch. Author: Robert Evans Reviewers: Abraham Fine , Mark Fenes , Andor Molnár , Kishor Patil Closes #453 from revans2/ZOOKEEPER-2845-master and squashes the following commits: 28c074a26 [Robert Evans] Addressed review comments 583e34435 [Robert Evans] Using framework APIs for test f26a21ad6 [Robert Evans] Addressed review comments 93168d716 [Robert Evans] Added in a modified version of the test 3d042f981 [Robert Evans] ZOOKEEPER-2845: Apply commit log when restarting server. Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/722ba940 Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/722ba940 Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/722ba940 Branch: refs/heads/master Commit: 722ba9409a44a35d287aac803813f508cff2420a Parents: f0b67b6 Author: Robert Evans Authored: Fri Feb 23 14:49:00 2018 -0800 Committer: Abraham Fine Committed: Fri Feb 23 14:49:00 2018 -0800 ---------------------------------------------------------------------- .../org/apache/zookeeper/server/ZKDatabase.java | 28 ++- .../zookeeper/server/ZooKeeperServer.java | 24 ++- .../server/persistence/FileTxnSnapLog.java | 16 ++ .../server/quorum/QuorumPeerMainTest.java | 186 +++++++++++++++---- 4 files changed, 209 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/722ba940/src/java/main/org/apache/zookeeper/server/ZKDatabase.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java index 6679e78..a03c955 100644 --- a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java +++ b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java @@ -223,6 +223,11 @@ public class ZKDatabase { return sessionsWithTimeouts; } + private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() { + public void onTxnLoaded(TxnHeader hdr, Record txn){ + addCommittedProposal(hdr, txn); + } + }; /** * load the database from the disk onto memory and also add @@ -231,18 +236,27 @@ public class ZKDatabase { * @throws IOException */ public long loadDataBase() throws IOException { - PlayBackListener listener=new PlayBackListener(){ - public void onTxnLoaded(TxnHeader hdr,Record txn){ - Request r = new Request(0, hdr.getCxid(),hdr.getType(), hdr, txn, hdr.getZxid()); - addCommittedProposal(r); - } - }; + long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener); + initialized = true; + return zxid; + } - long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener); + /** + * Fast forward the database adding transactions from the committed log into memory. + * @return the last valid zxid. + * @throws IOException + */ + public long fastForwardDataBase() throws IOException { + long zxid = snapLog.fastForwardFromEdits(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener); initialized = true; return zxid; } + private void addCommittedProposal(TxnHeader hdr, Record txn) { + Request r = new Request(0, hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); + addCommittedProposal(r); + } + /** * maintains a list of last committedLog * or so committed requests. This is used for http://git-wip-us.apache.org/repos/asf/zookeeper/blob/722ba940/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java index c8cd72d..9099b2f 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java @@ -557,14 +557,24 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { firstProcessor.shutdown(); } - if (fullyShutDown && zkDb != null) { - zkDb.clear(); + if (zkDb != null) { + if (fullyShutDown) { + zkDb.clear(); + } else { + // else there is no need to clear the database + // * When a new quorum is established we can still apply the diff + // on top of the same zkDb data + // * If we fetch a new snapshot from leader, the zkDb will be + // cleared anyway before loading the snapshot + try { + //This will fast forward the database to the latest recorded transactions + zkDb.fastForwardDataBase(); + } catch (IOException e) { + LOG.error("Error updating DB", e); + zkDb.clear(); + } + } } - // else there is no need to clear the database - // * When a new quorum is established we can still apply the diff - // on top of the same zkDb data - // * If we fetch a new snapshot from leader, the zkDb will be - // cleared anyway before loading the snapshot unregisterJMX(); } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/722ba940/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java index 3ca1781..8702bf3 100644 --- a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java +++ b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java @@ -239,6 +239,22 @@ public class FileTxnSnapLog { return -1L; } } + return fastForwardFromEdits(dt, sessions, listener); + } + + /** + * This function will fast forward the server database to have the latest + * transactions in it. This is the same as restore, but only reads from + * the transaction logs and not restores from a snapshot. + * @param dt the datatree to write transactions to. + * @param sessions the sessions to be restored. + * @param listener the playback listener to run on the + * database transactions. + * @return the highest zxid restored. + * @throws IOException + */ + public long fastForwardFromEdits(DataTree dt, Map sessions, + PlayBackListener listener) throws IOException { TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; http://git-wip-us.apache.org/repos/asf/zookeeper/blob/722ba940/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java index 9c6bd3a..43c341a 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java @@ -39,6 +39,7 @@ import org.apache.log4j.WriterAppender; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; @@ -249,14 +250,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { numServers = 3; servers = LaunchServers(numServers); String path = "/hzxidtest"; - int leader = -1; - - // find the leader - for (int i = 0; i < numServers; i++) { - if (servers.mt[i].main.quorumPeer.leader != null) { - leader = i; - } - } + int leader = servers.findLeader(); // make sure there is a leader Assert.assertTrue("There should be a leader", leader >= 0); @@ -366,12 +360,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { servers = LaunchServers(numServers, 500); // find the leader - int trueLeader = -1; - for (int i = 0; i < numServers; i++) { - if (servers.mt[i].main.quorumPeer.leader != null) { - trueLeader = i; - } - } + int trueLeader = servers.findLeader(); Assert.assertTrue("There should be a leader", trueLeader >= 0); // find a follower @@ -435,12 +424,16 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { int iterations = ClientBase.CONNECTION_TIMEOUT / 500; while (zk.getState() != state) { if (iterations-- == 0) { - throw new RuntimeException("Waiting too long"); + throw new RuntimeException("Waiting too long " + zk.getState() + " != " + state); } Thread.sleep(500); } } + private void waitForAll(Servers servers, States state) throws InterruptedException { + waitForAll(servers.zk, state); + } + private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException { int iterations = ClientBase.CONNECTION_TIMEOUT / 1000; boolean someoneNotConnected = true; @@ -465,6 +458,37 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { private static class Servers { MainThread mt[]; ZooKeeper zk[]; + int[] clientPorts; + + public void shutDownAllServers() throws InterruptedException { + for (MainThread t: mt) { + t.shutdown(); + } + } + + public void restartAllServersAndClients(Watcher watcher) throws IOException { + for (MainThread t : mt) { + if (!t.isAlive()) { + t.start(); + } + } + for (int i = 0; i < zk.length; i++) { + restartClient(i, watcher); + } + } + + public void restartClient(int clientIndex, Watcher watcher) throws IOException { + zk[clientIndex] = new ZooKeeper("127.0.0.1:" + clientPorts[clientIndex], ClientBase.CONNECTION_TIMEOUT, watcher); + } + + public int findLeader() { + for (int i = 0; i < mt.length; i++) { + if (mt[i].main.quorumPeer.leader != null) { + return i; + } + } + return -1; + } } @@ -474,7 +498,8 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { /** * This is a helper function for launching a set of servers * - * @param numServers* @param tickTime A ticktime to pass to MainThread + * @param numServers the number of servers + * @param tickTime A ticktime to pass to MainThread * @return * @throws IOException * @throws InterruptedException @@ -482,30 +507,28 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { private Servers LaunchServers(int numServers, Integer tickTime) throws IOException, InterruptedException { int SERVER_COUNT = numServers; Servers svrs = new Servers(); - final int clientPorts[] = new int[SERVER_COUNT]; + svrs.clientPorts = new int[SERVER_COUNT]; StringBuilder sb = new StringBuilder(); for(int i = 0; i < SERVER_COUNT; i++) { - clientPorts[i] = PortAssignment.unique(); - sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+clientPorts[i]+"\n"); + svrs.clientPorts[i] = PortAssignment.unique(); + sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+svrs.clientPorts[i]+"\n"); } String quorumCfgSection = sb.toString(); - MainThread mt[] = new MainThread[SERVER_COUNT]; - ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT]; + svrs.mt = new MainThread[SERVER_COUNT]; + svrs.zk = new ZooKeeper[SERVER_COUNT]; for(int i = 0; i < SERVER_COUNT; i++) { if (tickTime != null) { - mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection, new HashMap(), tickTime); + svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection, new HashMap(), tickTime); } else { - mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection); + svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection); } - mt[i].start(); - zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); + svrs.mt[i].start(); + svrs.restartClient(i, this); } - - waitForAll(zk, States.CONNECTED); - - svrs.mt = mt; - svrs.zk = zk; + + waitForAll(svrs, States.CONNECTED); + return svrs; } @@ -673,7 +696,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { + ":" + electionPort1 + ";" + CLIENT_PORT_QP1 + "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + electionPort2 + ";" + CLIENT_PORT_QP2; - + MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection); MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection); q1.start(); @@ -888,4 +911,105 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { maxSessionTimeOut, quorumPeer.getMaxSessionTimeout()); } + @Test + public void testFailedTxnAsPartOfQuorumLoss() throws Exception { + final int LEADER_TIMEOUT_MS = 10_000; + // 1. start up server and wait for leader election to finish + ClientBase.setupTestEnv(); + final int SERVER_COUNT = 3; + servers = LaunchServers(SERVER_COUNT); + + waitForAll(servers, States.CONNECTED); + + // we need to shutdown and start back up to make sure that the create session isn't the first transaction since + // that is rather innocuous. + servers.shutDownAllServers(); + waitForAll(servers, States.CONNECTING); + servers.restartAllServersAndClients(this); + waitForAll(servers, States.CONNECTED); + + // 2. kill all followers + int leader = servers.findLeader(); + Map outstanding = servers.mt[leader].main.quorumPeer.leader.outstandingProposals; + // increase the tick time to delay the leader going to looking + servers.mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS; + LOG.warn("LEADER {}", leader); + + for (int i = 0; i < SERVER_COUNT; i++) { + if (i != leader) { + servers.mt[i].shutdown(); + } + } + + // 3. start up the followers to form a new quorum + for (int i = 0; i < SERVER_COUNT; i++) { + if (i != leader) { + servers.mt[i].start(); + } + } + + // 4. wait one of the follower to be the new leader + for (int i = 0; i < SERVER_COUNT; i++) { + if (i != leader) { + // Recreate a client session since the previous session was not persisted. + servers.restartClient(i, this); + waitForOne(servers.zk[i], States.CONNECTED); + } + } + + // 5. send a create request to old leader and make sure it's synced to disk, + // which means it acked from itself + try { + servers.zk[leader].create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + Assert.fail("create /zk" + leader + " should have failed"); + } catch (KeeperException e) { + } + + // just make sure that we actually did get it in process at the + // leader + Assert.assertEquals(1, outstanding.size()); + Proposal p = outstanding.values().iterator().next(); + Assert.assertEquals(OpCode.create, p.request.getHdr().getType()); + + // make sure it has a chance to write it to disk + int sleepTime = 0; + Long longLeader = new Long(leader); + while (!p.qvAcksetPairs.get(0).getAckset().contains(longLeader)) { + if (sleepTime > 2000) { + Assert.fail("Transaction not synced to disk within 1 second " + p.qvAcksetPairs.get(0).getAckset() + + " expected " + leader); + } + Thread.sleep(100); + sleepTime += 100; + } + + // 6. wait for the leader to quit due to not enough followers and come back up as a part of the new quorum + LOG.info("Waiting for leader {} to timeout followers", leader); + sleepTime = 0; + Follower f = servers.mt[leader].main.quorumPeer.follower; + while (f == null || !f.isRunning()) { + if (sleepTime > LEADER_TIMEOUT_MS * 2) { + Assert.fail("Took too long for old leader to time out " + servers.mt[leader].main.quorumPeer.getPeerState()); + } + Thread.sleep(100); + sleepTime += 100; + f = servers.mt[leader].main.quorumPeer.follower; + } + + int newLeader = servers.findLeader(); + // make sure a different leader was elected + Assert.assertNotEquals(leader, newLeader); + + // 7. restart the previous leader to force it to replay the edits and possibly come up in a bad state + servers.mt[leader].shutdown(); + servers.mt[leader].start(); + waitForAll(servers, States.CONNECTED); + + // 8. check the node exist in previous leader but not others + // make sure everything is consistent + for (int i = 0; i < SERVER_COUNT; i++) { + Assert.assertNull("server " + i + " should not have /zk" + leader, servers.zk[i].exists("/zk" + leader, false)); + } + } }