From commits-return-6159-archive-asf-public=cust-asf.ponee.io@zookeeper.apache.org Sat Feb 24 00:19:08 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 4C9B0180652 for ; Sat, 24 Feb 2018 00:19:07 +0100 (CET) Received: (qmail 35369 invoked by uid 500); 23 Feb 2018 23:19:06 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 35358 invoked by uid 99); 23 Feb 2018 23:19:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Feb 2018 23:19:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8C042EB4E3; Fri, 23 Feb 2018 23:19:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: afine@apache.org To: commits@zookeeper.apache.org Message-Id: <08a4dc3a0c9a4835b98be27f8841b9ea@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: zookeeper git commit: ZOOKEEPER-2845: Apply commit log when restarting server. Date: Fri, 23 Feb 2018 23:19:04 +0000 (UTC) Repository: zookeeper Updated Branches: refs/heads/branch-3.4 0666919fe -> e0af6ed75 ZOOKEEPER-2845: Apply commit log when restarting server. This is the version of #453 for the 3.4 branch Author: Robert Evans Reviewers: Abraham Fine , Mark Fenes , Andor Molnár , Kishor Patil Closes #455 from revans2/ZOOKEEPER-2845-3.4 Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/e0af6ed7 Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/e0af6ed7 Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/e0af6ed7 Branch: refs/heads/branch-3.4 Commit: e0af6ed7598fc4555d7625ddc8efd86e2281babf Parents: 0666919 Author: Robert Evans Authored: Fri Feb 23 15:18:55 2018 -0800 Committer: Abraham Fine Committed: Fri Feb 23 15:18:55 2018 -0800 ---------------------------------------------------------------------- .../org/apache/zookeeper/server/ZKDatabase.java | 41 ++- .../zookeeper/server/ZooKeeperServer.java | 24 +- .../server/persistence/FileTxnSnapLog.java | 16 ++ .../server/quorum/QuorumPeerMainTest.java | 257 ++++++++++++++----- 4 files changed, 254 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e0af6ed7/src/java/main/org/apache/zookeeper/server/ZKDatabase.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java index 72357b7..ce36422 100644 --- a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java +++ b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java @@ -201,7 +201,12 @@ public class ZKDatabase { return sessionsWithTimeouts; } - + private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() { + public void onTxnLoaded(TxnHeader hdr, Record txn){ + addCommittedProposal(hdr, txn); + } + }; + /** * load the database from the disk onto memory and also add * the transactions to the committedlog in memory. @@ -209,22 +214,30 @@ public class ZKDatabase { * @throws IOException */ public long loadDataBase() throws IOException { - PlayBackListener listener=new PlayBackListener(){ - public void onTxnLoaded(TxnHeader hdr,Record txn){ - Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(), - null, null); - r.txn = txn; - r.hdr = hdr; - r.zxid = hdr.getZxid(); - addCommittedProposal(r); - } - }; - - long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener); + long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener); initialized = true; return zxid; } - + + /** + * Fast forward the database adding transactions from the committed log into memory. + * @return the last valid zxid. + * @throws IOException + */ + public long fastForwardDataBase() throws IOException { + long zxid = snapLog.fastForwardFromEdits(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener); + initialized = true; + return zxid; + } + + private void addCommittedProposal(TxnHeader hdr, Record txn) { + Request r = new Request(null, 0, hdr.getCxid(), hdr.getType(), null, null); + r.txn = txn; + r.hdr = hdr; + r.zxid = hdr.getZxid(); + addCommittedProposal(r); + } + /** * maintains a list of last committedLog * or so committed requests. This is used for http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e0af6ed7/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java index cbf5fa5..977adc2 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java @@ -512,14 +512,24 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { firstProcessor.shutdown(); } - if (fullyShutDown && zkDb != null) { - zkDb.clear(); + if (zkDb != null) { + if (fullyShutDown) { + zkDb.clear(); + } else { + // else there is no need to clear the database + // * When a new quorum is established we can still apply the diff + // on top of the same zkDb data + // * If we fetch a new snapshot from leader, the zkDb will be + // cleared anyway before loading the snapshot + try { + //This will fast forward the database to the latest recorded transactions + zkDb.fastForwardDataBase(); + } catch (IOException e) { + LOG.error("Error updating DB", e); + zkDb.clear(); + } + } } - // else there is no need to clear the database - // * When a new quorum is established we can still apply the diff - // on top of the same zkDb data - // * If we fetch a new snapshot from leader, the zkDb will be - // cleared anyway before loading the snapshot unregisterJMX(); } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e0af6ed7/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java index b261a8e..0ba8491 100644 --- a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java +++ b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java @@ -168,6 +168,22 @@ public class FileTxnSnapLog { public long restore(DataTree dt, Map sessions, PlayBackListener listener) throws IOException { snapLog.deserialize(dt, sessions); + return fastForwardFromEdits(dt, sessions, listener); + } + + /** + * This function will fast forward the server database to have the latest + * transactions in it. This is the same as restore, but only reads from + * the transaction logs and not restores from a snapshot. + * @param dt the datatree to write transactions to. + * @param sessions the sessions to be restored. + * @param listener the playback listener to run on the + * database transactions. + * @return the highest zxid restored. + * @throws IOException + */ + public long fastForwardFromEdits(DataTree dt, Map sessions, + PlayBackListener listener) throws IOException { FileTxnLog txnLog = new FileTxnLog(dataDir); TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); long highestZxid = dt.lastProcessedZxid; http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e0af6ed7/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java index 0399c97..cd09a36 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java @@ -50,6 +50,7 @@ import org.apache.log4j.WriterAppender; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; @@ -254,15 +255,8 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { numServers = 3; servers = LaunchServers(numServers); String path = "/hzxidtest"; - int leader=-1; + int leader = servers.findLeader(); - // find the leader - for (int i=0; i < numServers; i++) { - if (servers.mt[i].main.quorumPeer.leader != null) { - leader = i; - } - } - // make sure there is a leader Assert.assertTrue("There should be a leader", leader >=0); @@ -371,12 +365,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { servers = LaunchServers(numServers, 500); // find the leader - int trueLeader = -1; - for (int i = 0; i < numServers; i++) { - if (servers.mt[i].main.quorumPeer.leader != null) { - trueLeader = i; - } - } + int trueLeader = servers.findLeader(); Assert.assertTrue("There should be a leader", trueLeader >= 0); // find a follower @@ -437,27 +426,36 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { } private void waitForOne(ZooKeeper zk, States state) throws InterruptedException { - while(zk.getState() != state) { - Thread.sleep(500); - } + int iterations = ClientBase.CONNECTION_TIMEOUT / 500; + while (zk.getState() != state) { + if (iterations-- == 0) { + throw new RuntimeException("Waiting too long " + zk.getState() + " != " + state); + } + Thread.sleep(500); + } } - private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException { - int iterations = 10; - boolean someoneNotConnected = true; - while(someoneNotConnected) { - if (iterations-- == 0) { - ClientBase.logAllStackTraces(); - throw new RuntimeException("Waiting too long"); - } - - someoneNotConnected = false; - for(ZooKeeper zk: zks) { - if (zk.getState() != state) { - someoneNotConnected = true; - } - } - Thread.sleep(1000); + private void waitForAll(Servers servers, States state) throws InterruptedException { + waitForAll(servers.zk, state); + } + + private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException { + int iterations = ClientBase.CONNECTION_TIMEOUT / 1000; + boolean someoneNotConnected = true; + while (someoneNotConnected) { + if (iterations-- == 0) { + ClientBase.logAllStackTraces(); + throw new RuntimeException("Waiting too long"); + } + + someoneNotConnected = false; + for (ZooKeeper zk : zks) { + if (zk.getState() != state) { + someoneNotConnected = true; + break; + } + } + Thread.sleep(1000); } } @@ -465,48 +463,79 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { private class Servers { MainThread mt[]; ZooKeeper zk[]; + int[] clientPorts; + + public void shutDownAllServers() throws InterruptedException { + for (MainThread t: mt) { + t.shutdown(); + } + } + + public void restartAllServersAndClients(Watcher watcher) throws IOException { + for (MainThread t : mt) { + if (!t.isAlive()) { + t.start(); + } + } + for (int i = 0; i < zk.length; i++) { + restartClient(i, watcher); + } + } + + public void restartClient(int clientIndex, Watcher watcher) throws IOException { + zk[clientIndex] = new ZooKeeper("127.0.0.1:" + clientPorts[clientIndex], ClientBase.CONNECTION_TIMEOUT, watcher); + } + + public int findLeader() { + for (int i = 0; i < mt.length; i++) { + if (mt[i].main.quorumPeer.leader != null) { + return i; + } + } + return -1; + } } private Servers LaunchServers(int numServers) throws IOException, InterruptedException { return LaunchServers(numServers, null); } - /** * This is a helper function for launching a set of servers + /** + * This is a helper function for launching a set of servers * - * @param numServers* @param tickTime A ticktime to pass to MainThread + * @param numServers the number of servers + * @param tickTime A ticktime to pass to MainThread * @return * @throws IOException * @throws InterruptedException */ private Servers LaunchServers(int numServers, Integer tickTime) throws IOException, InterruptedException { - int SERVER_COUNT = numServers; - Servers svrs = new Servers(); - final int clientPorts[] = new int[SERVER_COUNT]; - StringBuilder sb = new StringBuilder(); - for(int i = 0; i < SERVER_COUNT; i++) { - clientPorts[i] = PortAssignment.unique(); - sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+"\n"); - } - String quorumCfgSection = sb.toString(); - - MainThread mt[] = new MainThread[SERVER_COUNT]; - ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT]; - for (int i = 0; i < SERVER_COUNT; i++) { - if (tickTime != null) { - mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection, new HashMap(), tickTime); - } else { - mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection); - } - mt[i].start(); - zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); - } - - waitForAll(zk, States.CONNECTED); - - svrs.mt = mt; - svrs.zk = zk; - return svrs; - } + int SERVER_COUNT = numServers; + Servers svrs = new Servers(); + svrs.clientPorts = new int[SERVER_COUNT]; + StringBuilder sb = new StringBuilder(); + for(int i = 0; i < SERVER_COUNT; i++) { + svrs.clientPorts[i] = PortAssignment.unique(); + sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+"\n"); + } + String quorumCfgSection = sb.toString(); + + svrs.mt = new MainThread[SERVER_COUNT]; + svrs.zk = new ZooKeeper[SERVER_COUNT]; + for(int i = 0; i < SERVER_COUNT; i++) { + if (tickTime != null) { + svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection, new HashMap(), tickTime); + } else { + svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection); + } + svrs.mt[i].start(); + svrs.restartClient(i, this); + } + + waitForAll(svrs, States.CONNECTED); + + return svrs; + } /** @@ -1120,4 +1149,106 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { return qp; } } + + @Test + public void testFailedTxnAsPartOfQuorumLoss() throws Exception { + final int LEADER_TIMEOUT_MS = 10000; + // 1. start up server and wait for leader election to finish + ClientBase.setupTestEnv(); + final int SERVER_COUNT = 3; + servers = LaunchServers(SERVER_COUNT); + + waitForAll(servers, States.CONNECTED); + + // we need to shutdown and start back up to make sure that the create session isn't the first transaction since + // that is rather innocuous. + servers.shutDownAllServers(); + waitForAll(servers, States.CONNECTING); + servers.restartAllServersAndClients(this); + waitForAll(servers, States.CONNECTED); + + // 2. kill all followers + int leader = servers.findLeader(); + Map outstanding = servers.mt[leader].main.quorumPeer.leader.outstandingProposals; + // increase the tick time to delay the leader going to looking + servers.mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS; + LOG.warn("LEADER " + leader); + + for (int i = 0; i < SERVER_COUNT; i++) { + if (i != leader) { + servers.mt[i].shutdown(); + } + } + + // 3. start up the followers to form a new quorum + for (int i = 0; i < SERVER_COUNT; i++) { + if (i != leader) { + servers.mt[i].start(); + } + } + + // 4. wait one of the follower to be the new leader + for (int i = 0; i < SERVER_COUNT; i++) { + if (i != leader) { + // Recreate a client session since the previous session was not persisted. + servers.restartClient(i, this); + waitForOne(servers.zk[i], States.CONNECTED); + } + } + + // 5. send a create request to old leader and make sure it's synced to disk, + // which means it acked from itself + try { + servers.zk[leader].create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + Assert.fail("create /zk" + leader + " should have failed"); + } catch (KeeperException e) { + } + + // just make sure that we actually did get it in process at the + // leader + Assert.assertEquals(1, outstanding.size()); + Proposal p = outstanding.values().iterator().next(); + Assert.assertEquals(OpCode.create, p.request.hdr.getType()); + + // make sure it has a chance to write it to disk + int sleepTime = 0; + Long longLeader = new Long(leader); + while (!p.ackSet.contains(longLeader)) { + if (sleepTime > 2000) { + Assert.fail("Transaction not synced to disk within 1 second " + p.ackSet + + " expected " + leader); + } + Thread.sleep(100); + sleepTime += 100; + } + + // 6. wait for the leader to quit due to not enough followers and come back up as a part of the new quorum + LOG.info("Waiting for leader " + leader + " to timeout followers"); + sleepTime = 0; + Follower f = servers.mt[leader].main.quorumPeer.follower; + while (f == null || !f.isRunning()) { + if (sleepTime > LEADER_TIMEOUT_MS * 2) { + Assert.fail("Took too long for old leader to time out " + servers.mt[leader].main.quorumPeer.getPeerState()); + } + Thread.sleep(100); + sleepTime += 100; + f = servers.mt[leader].main.quorumPeer.follower; + } + + int newLeader = servers.findLeader(); + // make sure a different leader was elected + Assert.assertTrue(leader != newLeader); + + // 7. restart the previous leader to force it to replay the edits and possibly come up in a bad state + servers.mt[leader].shutdown(); + servers.mt[leader].start(); + waitForAll(servers, States.CONNECTED); + + // 8. check the node exist in previous leader but not others + // make sure everything is consistent + for (int i = 0; i < SERVER_COUNT; i++) { + Assert.assertNull("server " + i + " should not have /zk" + leader, servers.zk[i].exists("/zk" + leader, false)); + } + } }