Repository: zookeeper
Updated Branches:
refs/heads/branch-3.4 0666919fe -> e0af6ed75
ZOOKEEPER-2845: Apply commit log when restarting server.
This is the version of #453 for the 3.4 branch
Author: Robert Evans <evans@yahoo-inc.com>
Reviewers: Abraham Fine <afine@apache.org>, Mark Fenes <mfenes@cloudera.com>,
Andor Molnár <andor@cloudera.com>, Kishor Patil <kpatil@yahoo-inc.com>
Closes #455 from revans2/ZOOKEEPER-2845-3.4
Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/e0af6ed7
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/e0af6ed7
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/e0af6ed7
Branch: refs/heads/branch-3.4
Commit: e0af6ed7598fc4555d7625ddc8efd86e2281babf
Parents: 0666919
Author: Robert Evans <evans@yahoo-inc.com>
Authored: Fri Feb 23 15:18:55 2018 -0800
Committer: Abraham Fine <afine@apache.org>
Committed: Fri Feb 23 15:18:55 2018 -0800
----------------------------------------------------------------------
.../org/apache/zookeeper/server/ZKDatabase.java | 41 ++-
.../zookeeper/server/ZooKeeperServer.java | 24 +-
.../server/persistence/FileTxnSnapLog.java | 16 ++
.../server/quorum/QuorumPeerMainTest.java | 257 ++++++++++++++-----
4 files changed, 254 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e0af6ed7/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
index 72357b7..ce36422 100644
--- a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
+++ b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
@@ -201,7 +201,12 @@ public class ZKDatabase {
return sessionsWithTimeouts;
}
-
+ private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener()
{
+ public void onTxnLoaded(TxnHeader hdr, Record txn){
+ addCommittedProposal(hdr, txn);
+ }
+ };
+
/**
* load the database from the disk onto memory and also add
* the transactions to the committedlog in memory.
@@ -209,22 +214,30 @@ public class ZKDatabase {
* @throws IOException
*/
public long loadDataBase() throws IOException {
- PlayBackListener listener=new PlayBackListener(){
- public void onTxnLoaded(TxnHeader hdr,Record txn){
- Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
- null, null);
- r.txn = txn;
- r.hdr = hdr;
- r.zxid = hdr.getZxid();
- addCommittedProposal(r);
- }
- };
-
- long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
+ long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
return zxid;
}
-
+
+ /**
+ * Fast forward the database adding transactions from the committed log into memory.
+ * @return the last valid zxid.
+ * @throws IOException
+ */
+ public long fastForwardDataBase() throws IOException {
+ long zxid = snapLog.fastForwardFromEdits(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
+ initialized = true;
+ return zxid;
+ }
+
+ private void addCommittedProposal(TxnHeader hdr, Record txn) {
+ Request r = new Request(null, 0, hdr.getCxid(), hdr.getType(), null, null);
+ r.txn = txn;
+ r.hdr = hdr;
+ r.zxid = hdr.getZxid();
+ addCommittedProposal(r);
+ }
+
/**
* maintains a list of last <i>committedLog</i>
* or so committed requests. This is used for
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e0af6ed7/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
index cbf5fa5..977adc2 100644
--- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -512,14 +512,24 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider
{
firstProcessor.shutdown();
}
- if (fullyShutDown && zkDb != null) {
- zkDb.clear();
+ if (zkDb != null) {
+ if (fullyShutDown) {
+ zkDb.clear();
+ } else {
+ // else there is no need to clear the database
+ // * When a new quorum is established we can still apply the diff
+ // on top of the same zkDb data
+ // * If we fetch a new snapshot from leader, the zkDb will be
+ // cleared anyway before loading the snapshot
+ try {
+ //This will fast forward the database to the latest recorded transactions
+ zkDb.fastForwardDataBase();
+ } catch (IOException e) {
+ LOG.error("Error updating DB", e);
+ zkDb.clear();
+ }
+ }
}
- // else there is no need to clear the database
- // * When a new quorum is established we can still apply the diff
- // on top of the same zkDb data
- // * If we fetch a new snapshot from leader, the zkDb will be
- // cleared anyway before loading the snapshot
unregisterJMX();
}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e0af6ed7/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
index b261a8e..0ba8491 100644
--- a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
+++ b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
@@ -168,6 +168,22 @@ public class FileTxnSnapLog {
public long restore(DataTree dt, Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
snapLog.deserialize(dt, sessions);
+ return fastForwardFromEdits(dt, sessions, listener);
+ }
+
+ /**
+ * This function will fast forward the server database to have the latest
+ * transactions in it. This is the same as restore, but only reads from
+ * the transaction logs and not restores from a snapshot.
+ * @param dt the datatree to write transactions to.
+ * @param sessions the sessions to be restored.
+ * @param listener the playback listener to run on the
+ * database transactions.
+ * @return the highest zxid restored.
+ * @throws IOException
+ */
+ public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
+ PlayBackListener listener) throws IOException {
FileTxnLog txnLog = new FileTxnLog(dataDir);
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
long highestZxid = dt.lastProcessedZxid;
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/e0af6ed7/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index 0399c97..cd09a36 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -50,6 +50,7 @@ import org.apache.log4j.WriterAppender;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
@@ -254,15 +255,8 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
numServers = 3;
servers = LaunchServers(numServers);
String path = "/hzxidtest";
- int leader=-1;
+ int leader = servers.findLeader();
- // find the leader
- for (int i=0; i < numServers; i++) {
- if (servers.mt[i].main.quorumPeer.leader != null) {
- leader = i;
- }
- }
-
// make sure there is a leader
Assert.assertTrue("There should be a leader", leader >=0);
@@ -371,12 +365,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
servers = LaunchServers(numServers, 500);
// find the leader
- int trueLeader = -1;
- for (int i = 0; i < numServers; i++) {
- if (servers.mt[i].main.quorumPeer.leader != null) {
- trueLeader = i;
- }
- }
+ int trueLeader = servers.findLeader();
Assert.assertTrue("There should be a leader", trueLeader >= 0);
// find a follower
@@ -437,27 +426,36 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
}
private void waitForOne(ZooKeeper zk, States state) throws InterruptedException {
- while(zk.getState() != state) {
- Thread.sleep(500);
- }
+ int iterations = ClientBase.CONNECTION_TIMEOUT / 500;
+ while (zk.getState() != state) {
+ if (iterations-- == 0) {
+ throw new RuntimeException("Waiting too long " + zk.getState() + " != " +
state);
+ }
+ Thread.sleep(500);
+ }
}
- private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException {
- int iterations = 10;
- boolean someoneNotConnected = true;
- while(someoneNotConnected) {
- if (iterations-- == 0) {
- ClientBase.logAllStackTraces();
- throw new RuntimeException("Waiting too long");
- }
-
- someoneNotConnected = false;
- for(ZooKeeper zk: zks) {
- if (zk.getState() != state) {
- someoneNotConnected = true;
- }
- }
- Thread.sleep(1000);
+ private void waitForAll(Servers servers, States state) throws InterruptedException {
+ waitForAll(servers.zk, state);
+ }
+
+ private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException {
+ int iterations = ClientBase.CONNECTION_TIMEOUT / 1000;
+ boolean someoneNotConnected = true;
+ while (someoneNotConnected) {
+ if (iterations-- == 0) {
+ ClientBase.logAllStackTraces();
+ throw new RuntimeException("Waiting too long");
+ }
+
+ someoneNotConnected = false;
+ for (ZooKeeper zk : zks) {
+ if (zk.getState() != state) {
+ someoneNotConnected = true;
+ break;
+ }
+ }
+ Thread.sleep(1000);
}
}
@@ -465,48 +463,79 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
private class Servers {
MainThread mt[];
ZooKeeper zk[];
+ int[] clientPorts;
+
+ public void shutDownAllServers() throws InterruptedException {
+ for (MainThread t: mt) {
+ t.shutdown();
+ }
+ }
+
+ public void restartAllServersAndClients(Watcher watcher) throws IOException {
+ for (MainThread t : mt) {
+ if (!t.isAlive()) {
+ t.start();
+ }
+ }
+ for (int i = 0; i < zk.length; i++) {
+ restartClient(i, watcher);
+ }
+ }
+
+ public void restartClient(int clientIndex, Watcher watcher) throws IOException {
+ zk[clientIndex] = new ZooKeeper("127.0.0.1:" + clientPorts[clientIndex], ClientBase.CONNECTION_TIMEOUT,
watcher);
+ }
+
+ public int findLeader() {
+ for (int i = 0; i < mt.length; i++) {
+ if (mt[i].main.quorumPeer.leader != null) {
+ return i;
+ }
+ }
+ return -1;
+ }
}
private Servers LaunchServers(int numServers) throws IOException, InterruptedException
{
return LaunchServers(numServers, null);
}
- /** * This is a helper function for launching a set of servers
+ /**
+ * This is a helper function for launching a set of servers
*
- * @param numServers* @param tickTime A ticktime to pass to MainThread
+ * @param numServers the number of servers
+ * @param tickTime A ticktime to pass to MainThread
* @return
* @throws IOException
* @throws InterruptedException
*/
private Servers LaunchServers(int numServers, Integer tickTime) throws IOException, InterruptedException
{
- int SERVER_COUNT = numServers;
- Servers svrs = new Servers();
- final int clientPorts[] = new int[SERVER_COUNT];
- StringBuilder sb = new StringBuilder();
- for(int i = 0; i < SERVER_COUNT; i++) {
- clientPorts[i] = PortAssignment.unique();
- sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+"\n");
- }
- String quorumCfgSection = sb.toString();
-
- MainThread mt[] = new MainThread[SERVER_COUNT];
- ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
- for (int i = 0; i < SERVER_COUNT; i++) {
- if (tickTime != null) {
- mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection, new HashMap<String,
String>(), tickTime);
- } else {
- mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection);
- }
- mt[i].start();
- zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT,
this);
- }
-
- waitForAll(zk, States.CONNECTED);
-
- svrs.mt = mt;
- svrs.zk = zk;
- return svrs;
- }
+ int SERVER_COUNT = numServers;
+ Servers svrs = new Servers();
+ svrs.clientPorts = new int[SERVER_COUNT];
+ StringBuilder sb = new StringBuilder();
+ for(int i = 0; i < SERVER_COUNT; i++) {
+ svrs.clientPorts[i] = PortAssignment.unique();
+ sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+"\n");
+ }
+ String quorumCfgSection = sb.toString();
+
+ svrs.mt = new MainThread[SERVER_COUNT];
+ svrs.zk = new ZooKeeper[SERVER_COUNT];
+ for(int i = 0; i < SERVER_COUNT; i++) {
+ if (tickTime != null) {
+ svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection, new
HashMap<String, String>(), tickTime);
+ } else {
+ svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection);
+ }
+ svrs.mt[i].start();
+ svrs.restartClient(i, this);
+ }
+
+ waitForAll(svrs, States.CONNECTED);
+
+ return svrs;
+ }
/**
@@ -1120,4 +1149,106 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
return qp;
}
}
+
+ @Test
+ public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
+ final int LEADER_TIMEOUT_MS = 10000;
+ // 1. start up server and wait for leader election to finish
+ ClientBase.setupTestEnv();
+ final int SERVER_COUNT = 3;
+ servers = LaunchServers(SERVER_COUNT);
+
+ waitForAll(servers, States.CONNECTED);
+
+ // we need to shutdown and start back up to make sure that the create session isn't
the first transaction since
+ // that is rather innocuous.
+ servers.shutDownAllServers();
+ waitForAll(servers, States.CONNECTING);
+ servers.restartAllServersAndClients(this);
+ waitForAll(servers, States.CONNECTED);
+
+ // 2. kill all followers
+ int leader = servers.findLeader();
+ Map<Long, Proposal> outstanding = servers.mt[leader].main.quorumPeer.leader.outstandingProposals;
+ // increase the tick time to delay the leader going to looking
+ servers.mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS;
+ LOG.warn("LEADER " + leader);
+
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ if (i != leader) {
+ servers.mt[i].shutdown();
+ }
+ }
+
+ // 3. start up the followers to form a new quorum
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ if (i != leader) {
+ servers.mt[i].start();
+ }
+ }
+
+ // 4. wait one of the follower to be the new leader
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ if (i != leader) {
+ // Recreate a client session since the previous session was not persisted.
+ servers.restartClient(i, this);
+ waitForOne(servers.zk[i], States.CONNECTED);
+ }
+ }
+
+ // 5. send a create request to old leader and make sure it's synced to disk,
+ // which means it acked from itself
+ try {
+ servers.zk[leader].create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ Assert.fail("create /zk" + leader + " should have failed");
+ } catch (KeeperException e) {
+ }
+
+ // just make sure that we actually did get it in process at the
+ // leader
+ Assert.assertEquals(1, outstanding.size());
+ Proposal p = outstanding.values().iterator().next();
+ Assert.assertEquals(OpCode.create, p.request.hdr.getType());
+
+ // make sure it has a chance to write it to disk
+ int sleepTime = 0;
+ Long longLeader = new Long(leader);
+ while (!p.ackSet.contains(longLeader)) {
+ if (sleepTime > 2000) {
+ Assert.fail("Transaction not synced to disk within 1 second " + p.ackSet
+ + " expected " + leader);
+ }
+ Thread.sleep(100);
+ sleepTime += 100;
+ }
+
+ // 6. wait for the leader to quit due to not enough followers and come back up as
a part of the new quorum
+ LOG.info("Waiting for leader " + leader + " to timeout followers");
+ sleepTime = 0;
+ Follower f = servers.mt[leader].main.quorumPeer.follower;
+ while (f == null || !f.isRunning()) {
+ if (sleepTime > LEADER_TIMEOUT_MS * 2) {
+ Assert.fail("Took too long for old leader to time out " + servers.mt[leader].main.quorumPeer.getPeerState());
+ }
+ Thread.sleep(100);
+ sleepTime += 100;
+ f = servers.mt[leader].main.quorumPeer.follower;
+ }
+
+ int newLeader = servers.findLeader();
+ // make sure a different leader was elected
+ Assert.assertTrue(leader != newLeader);
+
+ // 7. restart the previous leader to force it to replay the edits and possibly come
up in a bad state
+ servers.mt[leader].shutdown();
+ servers.mt[leader].start();
+ waitForAll(servers, States.CONNECTED);
+
+ // 8. check the node exist in previous leader but not others
+ // make sure everything is consistent
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ Assert.assertNull("server " + i + " should not have /zk" + leader, servers.zk[i].exists("/zk"
+ leader, false));
+ }
+ }
}
|