Author: mahadev
Date: Fri Nov 20 22:24:36 2009
New Revision: 882739
URL: http://svn.apache.org/viewvc?rev=882739&view=rev
Log:
ZOOKEEPER-582. ZooKeeper can revert to old data when a snapshot is created outside of normal
processing (ben reed and mahadev via mahadev)
Added:
hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/QuorumZxidSyncTest.java
Modified:
hadoop/zookeeper/branches/branch-3.1/CHANGES.txt
hadoop/zookeeper/branches/branch-3.1/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
hadoop/zookeeper/branches/branch-3.1/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/ClientBase.java
hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/QuorumBase.java
Modified: hadoop/zookeeper/branches/branch-3.1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.1/CHANGES.txt?rev=882739&r1=882738&r2=882739&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.1/CHANGES.txt (original)
+++ hadoop/zookeeper/branches/branch-3.1/CHANGES.txt Fri Nov 20 22:24:36 2009
@@ -7,6 +7,8 @@
ZOOKEEPER-562. c client can flood server with pings if tcp send queue
filled. (ben reed via mahadev)
+ ZOOKEEPER-582. ZooKeeper can revert to old data when a snapshot is created
+ outside of normal processing (ben reed and mahadev via mahadev)
Release 3.1.1 - 2009-03-17
Modified: hadoop/zookeeper/branches/branch-3.1/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.1/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=882739&r1=882738&r2=882739&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.1/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
(original)
+++ hadoop/zookeeper/branches/branch-3.1/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
Fri Nov 20 22:24:36 2009
@@ -172,7 +172,16 @@
writePacket(qp);
readPacket(qp);
long newLeaderZxid = qp.getZxid();
-
+
+ //check to see if the leader zxid is lower than ours
+ //this should never happen but is just a safety check
+ long lastLoggedZxid = sentLastZxid;
+ if ((newLeaderZxid >> 32L) < (lastLoggedZxid >> 32L)) {
+ LOG.fatal("Leader epoch " + Long.toHexString(newLeaderZxid >> 32L)
+ + " is less than our zxid " + Long.toHexString(lastLoggedZxid
>> 32L));
+ throw new IOException("Error: Epoch of leader is lower");
+ }
+
if (qp.getType() != Leader.NEWLEADER) {
LOG.error("First packet should have been NEWLEADER");
throw new IOException("First packet should have been NEWLEADER");
Modified: hadoop/zookeeper/branches/branch-3.1/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.1/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=882739&r1=882738&r2=882739&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.1/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
(original)
+++ hadoop/zookeeper/branches/branch-3.1/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
Fri Nov 20 22:24:36 2009
@@ -34,6 +34,7 @@
import org.apache.zookeeper.server.NIOServerCnxn;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.Util;
/**
* This class manages the quorum protocol. There are three states this server
@@ -327,9 +328,35 @@
new NIOServerCnxn.Factory(clientPort));
}
- public long getLastLoggedZxid(){
- return logFactory.getLastLoggedZxid();
+ /**
+ * returns the highest zxid that this host has seen
+ *
+ * @return the highest zxid for this host
+ */
+ public long getLastLoggedZxid() {
+ /*
+ * it is possible to have the last zxid with just a snapshot and no log
+ * related to it. one example is during upgrade wherein the there is no
+ * corresponding log to the snapshot. in that case just use the snapshot
+ * zxid
+ */
+
+ File lastSnapshot = null;
+ long maxZxid = -1L;
+ long maxLogZxid = logFactory.getLastLoggedZxid();
+ try {
+ lastSnapshot = logFactory.findMostRecentSnapshot();
+ if (lastSnapshot != null) {
+ maxZxid = Math.max(Util.getZxidFromName(lastSnapshot.getName(),
+ "snapshot"), maxLogZxid);
+ }
+ } catch (IOException ie) {
+ LOG.warn("Exception finding last snapshot ", ie);
+ maxZxid = maxLogZxid;
+ }
+ return maxZxid;
}
+
public Follower follower;
public Leader leader;
Modified: hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=882739&r1=882738&r2=882739&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/ClientBase.java
(original)
+++ hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/ClientBase.java
Fri Nov 20 22:24:36 2009
@@ -140,6 +140,8 @@
public static boolean waitForServerUp(String hp, long timeout) {
long start = System.currentTimeMillis();
+ // if there are multiple host ports just take the first one
+ hp = hp.split(",")[0];
String split[] = hp.split(":");
String host = split[0];
int port = Integer.parseInt(split[1]);
Modified: hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/QuorumBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/QuorumBase.java?rev=882739&r1=882738&r2=882739&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/QuorumBase.java
(original)
+++ hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/QuorumBase.java
Fri Nov 20 22:24:36 2009
@@ -99,16 +99,50 @@
LOG.info(hp + " is accepting client connections");
}
}
+
+ public void setUpServers() throws Exception {
+ int tickTime = 2000;
+ int initLimit = 3;
+ int syncLimit = 3;
+
+ HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
+ peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress("127.0.0.1",
3181)));
+ peers.put(Long.valueOf(2), new QuorumServer(2, new InetSocketAddress("127.0.0.1",
3182)));
+ peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress("127.0.0.1",
3183)));
+ peers.put(Long.valueOf(4), new QuorumServer(4, new InetSocketAddress("127.0.0.1",
3184)));
+ peers.put(Long.valueOf(5), new QuorumServer(5, new InetSocketAddress("127.0.0.1",
3185)));
+
+ LOG.info("creating QuorumPeer 1");
+ s1 = new QuorumPeer(peers, s1dir, s1dir, 2181, 0, 1, tickTime, initLimit, syncLimit);
+ assertEquals(2181, s1.getClientPort());
+ LOG.info("creating QuorumPeer 2");
+ s2 = new QuorumPeer(peers, s2dir, s2dir, 2182, 0, 2, tickTime, initLimit, syncLimit);
+ assertEquals(2182, s2.getClientPort());
+ LOG.info("creating QuorumPeer 3");
+ s3 = new QuorumPeer(peers, s3dir, s3dir, 2183, 0, 3, tickTime, initLimit, syncLimit);
+ assertEquals(2183, s3.getClientPort());
+ LOG.info("creating QuorumPeer 4");
+ s4 = new QuorumPeer(peers, s4dir, s4dir, 2184, 0, 4, tickTime, initLimit, syncLimit);
+ assertEquals(2184, s4.getClientPort());
+ LOG.info("creating QuorumPeer 5");
+ s5 = new QuorumPeer(peers, s5dir, s5dir, 2185, 0, 5, tickTime, initLimit, syncLimit);
+ assertEquals(2185, s5.getClientPort());
+ }
- @After
- @Override
- protected void tearDown() throws Exception {
- LOG.info("TearDown started");
+ public void shutdownServers() throws Exception {
shutdown(s1);
shutdown(s2);
shutdown(s3);
shutdown(s4);
shutdown(s5);
+ }
+
+
+ @After
+ @Override
+ protected void tearDown() throws Exception {
+ LOG.info("TearDown started");
+ shutdownServers();
for (String hp : hostPort.split(",")) {
assertTrue("waiting for server down",
Added: hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/QuorumZxidSyncTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/QuorumZxidSyncTest.java?rev=882739&view=auto
==============================================================================
--- hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/QuorumZxidSyncTest.java
(added)
+++ hadoop/zookeeper/branches/branch-3.1/src/java/test/org/apache/zookeeper/test/QuorumZxidSyncTest.java
Fri Nov 20 22:24:36 2009
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+
+import junit.framework.TestCase;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test what happens when logs fall behind the snapshots or a follower has a
+ * higher epoch than a leader.
+ */
+public class QuorumZxidSyncTest extends TestCase {
+ QuorumBase qb = new QuorumBase();
+
+ @Before
+ @Override
+ protected void setUp() throws Exception {
+ qb.setUp();
+ }
+
+ @Test
+ /**
+ * find out what happens when a follower connects to leader that is behind
+ */
+ public void testBehindLeader() throws Exception {
+ // crank up the epoch numbers
+ ClientBase.waitForServerUp(qb.hostPort, 10000);
+ ClientBase.waitForServerUp(qb.hostPort, 10000);
+ ZooKeeper zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+ public void process(WatchedEvent event) {
+ }});
+ zk.create("/0", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.close();
+ qb.shutdownServers();
+ qb.startServers();
+ ClientBase.waitForServerUp(qb.hostPort, 10000);
+ qb.tearDown();
+ qb.startServers();
+ ClientBase.waitForServerUp(qb.hostPort, 10000);
+ zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+ public void process(WatchedEvent event) {
+ }});
+ zk.create("/1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.close();
+ qb.shutdownServers();
+ qb.startServers();
+ ClientBase.waitForServerUp(qb.hostPort, 10000);
+ qb.shutdownServers();
+ qb.startServers();
+ ClientBase.waitForServerUp(qb.hostPort, 10000);
+ zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+ public void process(WatchedEvent event) {
+ }});
+ zk.create("/2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.close();
+ qb.shutdownServers();
+ deleteFiles(qb.s1dir);
+ deleteFiles(qb.s2dir);
+ deleteFiles(qb.s3dir);
+ deleteFiles(qb.s4dir);
+ qb.setUpServers();
+ qb.s1.start();
+ qb.s2.start();
+ qb.s3.start();
+ qb.s4.start();
+ assertTrue("Servers didn't come up", ClientBase.waitForServerUp(qb.hostPort, 10000));
+ qb.s5.start();
+ String hostPort = "127.0.0.1:" + qb.s5.getClientPort();
+ assertFalse("Servers came up, but shouldn't have since it's ahead of leader",
+ ClientBase.waitForServerUp(hostPort, 10000));
+ }
+
+
+ private void deleteFiles(File f) {
+ File v = new File(f, "version-2");
+ for(File c: v.listFiles()) {
+ c.delete();
+ }
+ }
+
+ @Test
+ /**
+ * find out what happens when the latest state is in the snapshots not
+ * the logs.
+ */
+ public void testLateLogs() throws Exception {
+ // crank up the epoch numbers
+ ClientBase.waitForServerUp(qb.hostPort, 10000);
+ ClientBase.waitForServerUp(qb.hostPort, 10000);
+ ZooKeeper zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+ public void process(WatchedEvent event) {
+ }});
+ zk.create("/0", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.close();
+ qb.tearDown();
+ qb.startServers();
+ ClientBase.waitForServerUp(qb.hostPort, 10000);
+ qb.tearDown();
+ qb.startServers();
+ ClientBase.waitForServerUp(qb.hostPort, 10000);
+ zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+ public void process(WatchedEvent event) {
+ }});
+ zk.create("/1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.close();
+ qb.tearDown();
+ qb.startServers();
+ ClientBase.waitForServerUp(qb.hostPort, 10000);
+ qb.tearDown();
+ deleteLogs(qb.s1dir);
+ deleteLogs(qb.s2dir);
+ deleteLogs(qb.s3dir);
+ deleteLogs(qb.s4dir);
+ deleteLogs(qb.s5dir);
+ qb.startServers();
+ ClientBase.waitForServerUp(qb.hostPort, 10000);
+ zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+ public void process(WatchedEvent event) {
+ }});
+ zk.create("/2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.close();
+ qb.tearDown();
+ qb.startServers();
+ ClientBase.waitForServerUp(qb.hostPort, 10000);
+ zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+ public void process(WatchedEvent event) {
+ }});
+ boolean saw2 = false;
+ for(String child: zk.getChildren("/", false)) {
+ if (child.equals("2")) {
+ saw2 = true;
+ }
+ }
+ zk.close();
+ assertTrue("Didn't see /2 (went back in time)", saw2);
+ }
+
+ private void deleteLogs(File f) {
+ File v = new File(f, "version-2");
+ for(File c: v.listFiles()) {
+ if (c.getName().startsWith("log")) {
+ c.delete();
+ }
+ }
+ }
+
+ @After
+ @Override
+ public void tearDown() throws Exception {
+ qb.tearDown();
+ }
+}
|