zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h...@apache.org
Subject zookeeper git commit: ZOOKEEPER-2678: Discovery and Sync can take a very long time on large DB (3.4)
Date Tue, 14 Feb 2017 18:05:57 GMT
Repository: zookeeper
Updated Branches:
  refs/heads/branch-3.4 e8247eec1 -> 2de93fe45


ZOOKEEPER-2678: Discovery and Sync can take a very long time on large DB (3.4)

This patch addresses recovery time when a leader is lost on a large DB.

It does this by not clearing the DB before leader election begins, and by avoiding taking
a snapshot as part of the SYNC phase, specifically for a DIFF sync. It does this by buffering
the proposals and commits just like the code currently does for proposals/commits sent after
the NEWLEADER and before the UPTODATE messages.

If a SNAP is sent we cannot avoid writing out the full snapshot because there is no other
way to make sure the disk DB is in sync with what is in memory.  So any edits to the edit
log before a background snapshot happened could possibly be applied on top of an incorrect
snapshot.

This same optimization should work for TRUNC too, but I opted not to do it for TRUNC because
TRUNC is rare and TRUNC by its very nature already forces the DB to be reread after the edit
logs are modified.  So it would still not be fast.

In practice this makes it so instead of taking 5+ mins for the cluster to recover from losing
a leader it now takes about 3 seconds.

I am happy to port this to 3.5. if it looks good.

Author: Robert (Bobby) Evans <evans@yahoo-inc.com>

Reviewers: Flavio Junqueira <fpj@apache.org>, Edward Ribeiro <edward.ribeiro@gmail.com>,
Abraham Fine <afine@apache.org>, Michael Han <hanm@apache.org>

Closes #157 from revans2/ZOOKEEPER-2678 and squashes the following commits:

d079617 [Robert (Bobby) Evans] ZOOKEEPER-2678:  Improved test to verify snapshot times
dcbf325 [Robert (Bobby) Evans] Addressed review comments
f57c384 [Robert (Bobby) Evans] ZOOKEEPER-2678: Fixed another race
f705293 [Robert (Bobby) Evans] ZOOKEEPER-2678: Addressed review comments
5aa2562 [Robert (Bobby) Evans] ZOOKEEPER-2678: Discovery and Sync can take a very long time
on large DBs


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/2de93fe4
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/2de93fe4
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/2de93fe4

Branch: refs/heads/branch-3.4
Commit: 2de93fe450b909ff76ac8bd8fa44296a515845a5
Parents: e8247ee
Author: Robert (Bobby) Evans <evans@yahoo-inc.com>
Authored: Tue Feb 14 10:05:52 2017 -0800
Committer: Michael Han <hanm@apache.org>
Committed: Tue Feb 14 10:05:52 2017 -0800

----------------------------------------------------------------------
 .../zookeeper/server/ZooKeeperServer.java       | 18 +++++++++--
 .../zookeeper/server/ZooKeeperServerMain.java   |  2 +-
 .../apache/zookeeper/server/quorum/Learner.java | 34 +++++++++++++-------
 .../zookeeper/server/quorum/Zab1_0Test.java     | 29 ++++++++++++++++-
 4 files changed, 67 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2de93fe4/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
index 62ac466..b0a8341 100644
--- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -489,7 +489,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider
{
         return state == State.RUNNING;
     }
 
-    public synchronized void shutdown() {
+    public void shutdown() {
+        shutdown(false);
+    }
+
+    /**
+     * Shut down the server instance
+     * @param fullyShutDown true if another server using the same database will not replace
this one in the same process
+     */
+    public synchronized void shutdown(boolean fullyShutDown) {
         if (!canShutdown()) {
             LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
             return;
@@ -507,9 +515,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider
{
         if (firstProcessor != null) {
             firstProcessor.shutdown();
         }
-        if (zkDb != null) {
+
+        if (fullyShutDown && zkDb != null) {
             zkDb.clear();
         }
+        // else there is no need to clear the database
+        //  * When a new quorum is established we can still apply the diff
+        //    on top of the same zkDb data
+        //  * If we fetch a new snapshot from leader, the zkDb will be
+        //    cleared anyway before loading the snapshot
 
         unregisterJMX();
     }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2de93fe4/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
index 612d227..1b0f59f 100644
--- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
+++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
@@ -124,7 +124,7 @@ public class ZooKeeperServerMain {
 
             cnxnFactory.join();
             if (zkServer.canShutdown()) {
-                zkServer.shutdown();
+                zkServer.shutdown(true);
             }
         } catch (InterruptedException e) {
             // warn, but generally this is ok

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2de93fe4/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
index 749b274..c54f6e6 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
@@ -321,13 +321,16 @@ public class Learner {
         QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
         QuorumPacket qp = new QuorumPacket();
         long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
-        
-        readPacket(qp);   
+        // In the DIFF case we don't need to do a snapshot because the transactions will
sync on top of any existing snapshot
+        // For SNAP and TRUNC the snapshot is needed to save that history
+        boolean snapshotNeeded = true;
+        readPacket(qp);
         LinkedList<Long> packetsCommitted = new LinkedList<Long>();
         LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
         synchronized (zk) {
             if (qp.getType() == Leader.DIFF) {
-                LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));
               
+                LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
+                snapshotNeeded = false;
             }
             else if (qp.getType() == Leader.SNAP) {
                 LOG.info("Getting a snapshot from leader");
@@ -364,10 +367,13 @@ public class Learner {
             
             long lastQueued = 0;
 
-            // in V1.0 we take a snapshot when we get the NEWLEADER message, but in pre V1.0
-            // we take the snapshot at the UPDATE, since V1.0 also gets the UPDATE (after
the NEWLEADER)
+            // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message,
but in pre V1.0
+            // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE
(after the NEWLEADER)
             // we need to make sure that we don't take the snapshot twice.
-            boolean snapshotTaken = false;
+            boolean isPreZAB1_0 = true;
+            //If we are not going to take the snapshot be sure the transactions are not applied
in memory
+            // but written out to the transaction log
+            boolean writeToTxnLog = !snapshotNeeded;
             // we are now going to start getting transactions to apply followed by an UPTODATE
             outerLoop:
             while (self.isRunning()) {
@@ -387,7 +393,7 @@ public class Learner {
                     packetsNotCommitted.add(pif);
                     break;
                 case Leader.COMMIT:
-                    if (!snapshotTaken) {
+                    if (!writeToTxnLog) {
                         pif = packetsNotCommitted.peekFirst();
                         if (pif.hdr.getZxid() != qp.getZxid()) {
                             LOG.warn("Committing " + qp.getZxid() + ", but next proposal
is " + pif.hdr.getZxid());
@@ -415,7 +421,7 @@ public class Learner {
                                 + Long.toHexString(lastQueued + 1));
                     }
                     lastQueued = packet.hdr.getZxid();
-                    if (!snapshotTaken) {
+                    if (!writeToTxnLog) {
                         // Apply to db directly if we haven't taken the snapshot
                         zk.processTxn(packet.hdr, packet.rec);
                     } else {
@@ -424,13 +430,14 @@ public class Learner {
                     }
                     break;
                 case Leader.UPTODATE:
-                    if (!snapshotTaken) { // true for the pre v1.0 case
+                    if (isPreZAB1_0) {
                         zk.takeSnapshot();
                         self.setCurrentEpoch(newEpoch);
                     }
                     self.cnxnFactory.setZooKeeperServer(zk);                
                     break outerLoop;
-                case Leader.NEWLEADER: // it will be NEWLEADER in v1.0
+                case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery

+                    // means this is Zab 1.0
                     // Create updatingEpoch file and remove it after current
                     // epoch is set. QuorumPeer.loadDataBase() uses this file to
                     // detect the case where the server was terminated after
@@ -441,13 +448,16 @@ public class Learner {
                         throw new IOException("Failed to create " +
                                               updating.toString());
                     }
-                    zk.takeSnapshot();
+                    if (snapshotNeeded) {
+                        zk.takeSnapshot();
+                    }
                     self.setCurrentEpoch(newEpoch);
                     if (!updating.delete()) {
                         throw new IOException("Failed to delete " +
                                               updating.toString());
                     }
-                    snapshotTaken = true;
+                    writeToTxnLog = true; //Anything after this needs to go to the transaction
log, not applied directly in memory
+                    isPreZAB1_0 = false;
                     writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null),
true);
                     break;
                 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2de93fe4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
index 52e7d27..3ed6097 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
@@ -19,6 +19,9 @@
 package org.apache.zookeeper.server.quorum;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedReader;
@@ -645,6 +648,8 @@ public class Zab1_0Test {
                 tmpDir.mkdir();
                 File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile();
                 File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
+                //Spy on ZK so we can check if a snapshot happened or not.
+                f.zk = spy(f.zk);
                 try {
                     Assert.assertEquals(0, f.self.getAcceptedEpoch());
                     Assert.assertEquals(0, f.self.getCurrentEpoch());
@@ -687,6 +692,10 @@ public class Zab1_0Test {
                     oa.writeRecord(qp, null);
                     zkDb.serializeSnapshot(oa);
                     oa.writeString("BenWasHere", null);
+                    Thread.sleep(10); //Give it some time to process the snap
+                    //No Snapshot taken yet, the SNAP was applied in memory
+                    verify(f.zk, never()).takeSnapshot();
+
                     qp.setType(Leader.NEWLEADER);
                     qp.setZxid(ZxidUtils.makeZxid(1, 0));
                     oa.writeRecord(qp, null);
@@ -697,7 +706,8 @@ public class Zab1_0Test {
                     Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
                     Assert.assertEquals(1, f.self.getAcceptedEpoch());
                     Assert.assertEquals(1, f.self.getCurrentEpoch());
-                    
+                    //Make sure that we did take the snapshot now
+                    verify(f.zk).takeSnapshot();
                     Assert.assertEquals(firstZxid, f.fzk.getLastProcessedZxid());
                     
                     // Make sure the data was recorded in the filesystem ok
@@ -773,6 +783,8 @@ public class Zab1_0Test {
                 tmpDir.mkdir();
                 File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile();
                 File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
+                //Spy on ZK so we can check if a snapshot happened or not.
+                f.zk = spy(f.zk);
                 try {
                     Assert.assertEquals(0, f.self.getAcceptedEpoch());
                     Assert.assertEquals(0, f.self.getCurrentEpoch());
@@ -839,13 +851,28 @@ public class Zab1_0Test {
                     Assert.assertEquals(1, f.self.getAcceptedEpoch());
                     Assert.assertEquals(1, f.self.getCurrentEpoch());
                     
+                    //Wait for the transactions to be written out. The thread that writes
them out
+                    // does not send anything back when it is done.
+                    long start = System.currentTimeMillis();
+                    while (createSessionZxid != f.fzk.getLastProcessedZxid() && (System.currentTimeMillis()
- start) < 50) {
+                        Thread.sleep(1);
+                    }
+                    
                     Assert.assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid());
                     
                     // Make sure the data was recorded in the filesystem ok
                     ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
+                    start = System.currentTimeMillis();
                     zkDb2.loadDataBase();
+                    while (zkDb2.getSessionWithTimeOuts().isEmpty() && (System.currentTimeMillis()
- start) < 50) {
+                        Thread.sleep(1);
+                        zkDb2.loadDataBase();
+                    }
                     LOG.info("zkdb2 sessions:" + zkDb2.getSessions());
+                    LOG.info("zkdb2 with timeouts:" + zkDb2.getSessionWithTimeOuts());
                     Assert.assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L));
+                    //Snapshot was never taken during very simple sync
+                    verify(f.zk, never()).takeSnapshot();
                 } finally {
                     recursiveDelete(tmpDir);
                 }


Mime
View raw message