Author: breed
Date: Wed Jan 4 00:27:21 2012
New Revision: 1227002
URL: http://svn.apache.org/viewvc?rev=1227002&view=rev
Log:
ZOOKEEPER-1343. getEpochToPropose should check if lastAcceptedEpoch is greater or equal than
epoch
Modified:
zookeeper/branches/branch-3.4/CHANGES.txt
zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
Modified: zookeeper/branches/branch-3.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/CHANGES.txt?rev=1227002&r1=1227001&r2=1227002&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.4/CHANGES.txt Wed Jan 4 00:27:21 2012
@@ -10,6 +10,7 @@ BUGFIXES:
ZOOKEEPER-1345. Add a .gitignore file with general exclusions and
Eclipse project files excluded (Harsh J via phunt)
+ ZOOKEEPER-1343. getEpochToPropose should check if lastAcceptedEpoch is greater or equal
than epoch (fpj via breed)
Release 3.4.2 - 2011-12-21
Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1227002&r1=1227001&r2=1227002&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
(original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
Wed Jan 4 00:27:21 2012
@@ -234,7 +234,7 @@ public class Leader {
*/
final static int INFORM = 8;
- ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long,
Proposal>();
+ ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long,
Proposal>();
ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();
@@ -410,7 +410,7 @@ public class Leader {
}
}
- boolean isShutdown;
+ boolean isShutdown;
/**
* Close down all the LearnerHandlers
@@ -767,54 +767,54 @@ public class Leader {
}
private HashSet<Long> connectingFollowers = new HashSet<Long>();
- public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException,
IOException {
- synchronized(connectingFollowers) {
- if (!waitingForNewEpoch) {
- return epoch;
- }
- if (lastAcceptedEpoch > epoch) {
- epoch = lastAcceptedEpoch+1;
- }
- connectingFollowers.add(sid);
- QuorumVerifier verifier = self.getQuorumVerifier();
- if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers))
-{
- waitingForNewEpoch = false;
- self.setAcceptedEpoch(epoch);
- connectingFollowers.notifyAll();
- } else {
- long start = System.currentTimeMillis();
- long cur = start;
+ public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException,
IOException {
+ synchronized(connectingFollowers) {
+ if (!waitingForNewEpoch) {
+ return epoch;
+ }
+ if (lastAcceptedEpoch >= epoch) {
+ epoch = lastAcceptedEpoch+1;
+ }
+ connectingFollowers.add(sid);
+ QuorumVerifier verifier = self.getQuorumVerifier();
+ if (connectingFollowers.contains(self.getId()) &&
+ verifier.containsQuorum(connectingFollowers))
{
+ waitingForNewEpoch = false;
+ self.setAcceptedEpoch(epoch);
+ connectingFollowers.notifyAll();
+ } else {
+ long start = System.currentTimeMillis();
+ long cur = start;
long end = start + self.getInitLimit()*self.getTickTime();
while(waitingForNewEpoch && cur < end) {
connectingFollowers.wait(end - cur);
cur = System.currentTimeMillis();
}
- if (waitingForNewEpoch) {
+ if (waitingForNewEpoch) {
throw new InterruptedException("Timeout while waiting for epoch from
quorum");
- }
- }
- return epoch;
- }
- }
-
- private HashSet<Long> electingFollowers = new HashSet<Long>();
- private boolean electionFinished = false;
- public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException
{
- synchronized(electingFollowers) {
- if (electionFinished) {
- return;
- }
- if (ss.getCurrentEpoch() != -1) {
- if (ss.isMoreRecentThan(leaderStateSummary)) {
- throw new IOException("Follower is ahead of the leader");
- }
- electingFollowers.add(id);
- }
- QuorumVerifier verifier = self.getQuorumVerifier();
- if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers))
{
- electionFinished = true;
- electingFollowers.notifyAll();
+ }
+ }
+ return epoch;
+ }
+ }
+
+ private HashSet<Long> electingFollowers = new HashSet<Long>();
+ private boolean electionFinished = false;
+ public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException
{
+ synchronized(electingFollowers) {
+ if (electionFinished) {
+ return;
+ }
+ if (ss.getCurrentEpoch() != -1) {
+ if (ss.isMoreRecentThan(leaderStateSummary)) {
+ throw new IOException("Follower is ahead of the leader");
+ }
+ electingFollowers.add(id);
+ }
+ QuorumVerifier verifier = self.getQuorumVerifier();
+ if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers))
{
+ electionFinished = true;
+ electingFollowers.notifyAll();
} else {
long start = System.currentTimeMillis();
long cur = start;
@@ -825,8 +825,8 @@ public class Leader {
}
if (!electionFinished) {
throw new InterruptedException("Timeout while waiting for epoch to be
acked by quorum");
- }
- }
- }
- }
+ }
+ }
+ }
+ }
}
Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1227002&r1=1227001&r2=1227002&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
(original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
Wed Jan 4 00:27:21 2012
@@ -48,6 +48,7 @@ import org.apache.zookeeper.server.Serve
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.server.util.ZxidUtils;
@@ -75,7 +76,26 @@ public class Zab1_0Test {
}
}
}
-
+
+ private static final class MockLeader extends Leader {
+
+ MockLeader(QuorumPeer qp, LeaderZooKeeperServer zk)
+ throws IOException {
+ super(qp, zk);
+ }
+
+ /**
+ * This method returns the value of the variable that holds the epoch
+ * to be proposed and that has been proposed, depending on the point
+ * of the execution in which it is called.
+ *
+ * @return epoch
+ */
+ public long getCurrentEpochToPropose() {
+ return epoch;
+ }
+ }
+
public static final class FollowerMockThread extends Thread {
private final Leader leader;
private final long followerSid;
@@ -144,6 +164,54 @@ public class Zab1_0Test {
}
}
+ /**
+ * In this test, the leader sets the last accepted epoch to 5. The call
+ * to getEpochToPropose should set epoch to 6 and wait until another
+ * follower executes it. If in getEpochToPropose we don't check if
+ * lastAcceptedEpoch == epoch, then the call from the subsequent
+ * follower with lastAcceptedEpoch = 6 doesn't change the value
+ * of epoch, and the test fails. It passes with the fix to predicate.
+ *
+ * {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1343}
+ *
+ *
+ * @throws Exception
+ */
+
+ @Test
+ public void testLastAcceptedEpoch() throws Exception {
+ File tmpDir = File.createTempFile("test", "dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ Leader leader = null;
+ LeadThread leadThread = null;
+ try {
+ QuorumPeer peer = createQuorumPeer(tmpDir);
+ leader = createMockLeader(tmpDir, peer);
+ peer.leader = leader;
+ peer.setAcceptedEpoch(5);
+ leadThread = new LeadThread(leader);
+ leadThread.start();
+
+ while(((MockLeader) leader).getCurrentEpochToPropose() != 6){
+ Thread.sleep(20);
+ }
+
+ try {
+ long epoch = leader.getEpochToPropose(1, 6);
+ Assert.assertEquals("New proposed epoch is wrong", 7, epoch);
+ } catch (Exception e){
+ Assert.fail("Timed out in getEpochToPropose");
+ }
+
+ } finally {
+ recursiveDelete(tmpDir);
+ if (leader != null) {
+ leader.shutdown("end of test");
+ }
+ }
+ }
+
@Test
public void testLeaderInElectingFollowers() throws Exception {
File tmpDir = File.createTempFile("test", "dir");
@@ -632,7 +700,19 @@ public class Zab1_0Test {
}
private Leader createLeader(File tmpDir, QuorumPeer peer)
- throws IOException, NoSuchFieldException, IllegalAccessException {
+ throws IOException, NoSuchFieldException, IllegalAccessException{
+ LeaderZooKeeperServer zk = prepareLeader(tmpDir, peer);
+ return new Leader(peer, zk);
+ }
+
+ private Leader createMockLeader(File tmpDir, QuorumPeer peer)
+ throws IOException, NoSuchFieldException, IllegalAccessException{
+ LeaderZooKeeperServer zk = prepareLeader(tmpDir, peer);
+ return new MockLeader(peer, zk);
+ }
+
+ private LeaderZooKeeperServer prepareLeader(File tmpDir, QuorumPeer peer)
+ throws IOException, NoSuchFieldException, IllegalAccessException {
FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
peer.setTxnFactory(logFactory);
Field addrField = peer.getClass().getDeclaredField("myQuorumAddr");
@@ -640,7 +720,7 @@ public class Zab1_0Test {
addrField.set(peer, new InetSocketAddress(33556));
ZKDatabase zkDb = new ZKDatabase(logFactory);
LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, peer, new ZooKeeperServer.BasicDataTreeBuilder(),
zkDb);
- return new Leader(peer, zk);
+ return zk;
}
static class ConversableFollower extends Follower {
|