Author: mahadev
Date: Tue Aug 25 05:47:20 2009
New Revision: 807486
URL: http://svn.apache.org/viewvc?rev=807486&view=rev
Log:
ZOOKEEPER-408. proposals and commits for DIFF and Truncate messages from the leader to followers
is buggy. (mahadev and ben via mahadev)
Modified:
hadoop/zookeeper/branches/branch-3.2/CHANGES.txt
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/ClientBase.java
hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/QuorumTest.java
Modified: hadoop/zookeeper/branches/branch-3.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/CHANGES.txt?rev=807486&r1=807485&r2=807486&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.2/CHANGES.txt (original)
+++ hadoop/zookeeper/branches/branch-3.2/CHANGES.txt Tue Aug 25 05:47:20 2009
@@ -61,6 +61,9 @@
ZOOKEEPER-498. Unending Leader Elections : WAN configuration (flavio via
mahadev)
+ ZOOKEEPER-408. proposals and commits for DIFF and Truncate messages from the
+ leader to followers is buggy. (mahadev and ben via mahadev)
+
IMPROVEMENTS:
NEW FEATURES:
Modified: hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java?rev=807486&r1=807485&r2=807486&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
(original)
+++ hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
Tue Aug 25 05:47:20 2009
@@ -24,6 +24,7 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
+import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
@@ -271,8 +272,8 @@
*/
public boolean truncate(long zxid) throws IOException {
FileTxnIterator itr = new FileTxnIterator(this.logDir, zxid);
- FileInputStream input = itr.inputStream;
- long pos = input.getChannel().position();
+ PositionInputStream input = itr.inputStream;
+ long pos = input.getPosition();
// now, truncate at the current position
RandomAccessFile raf=new RandomAccessFile(itr.logFile,"rw");
raf.setLength(pos);
@@ -322,6 +323,48 @@
}
/**
+ * a class that keeps track of the position
+ * in the input stream. The position points to offset
+ * that has been consumed by the applications. It can
+ * wrap buffered input streams to provide the right offset
+ * for the application.
+ */
+ static class PositionInputStream extends FilterInputStream {
+ long position;
+ protected PositionInputStream(InputStream in) {
+ super(in);
+ }
+
+ @Override
+ public int read() throws IOException {
+ int rc = super.read();
+ if (rc > 0) {
+ position++;
+ }
+ return rc;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int rc = super.read(b, off, len);
+ position += rc;
+ return rc;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ long rc = super.skip(n);
+ if (rc > 0) {
+ position += rc;
+ }
+ return rc;
+ }
+ public long getPosition() {
+ return position;
+ }
+ }
+
+ /**
* this class implements the txnlog iterator interface
* which is used for reading the transaction logs
*/
@@ -333,7 +376,8 @@
File logFile;
InputArchive ia;
static final String CRC_ERROR="CRC check failed";
- FileInputStream inputStream=null;
+
+ PositionInputStream inputStream=null;
//stored files is the list of files greater than
//the zxid we are looking for.
private ArrayList<File> storedFiles;
@@ -398,7 +442,7 @@
* @param is the inputstream
* @throws IOException
*/
- protected void inStreamCreated(InputArchive ia, FileInputStream is)
+ protected void inStreamCreated(InputArchive ia, InputStream is)
throws IOException{
FileHeader header= new FileHeader();
header.deserialize(ia, "fileheader");
@@ -416,9 +460,9 @@
**/
protected InputArchive createInputArchive(File logFile) throws IOException {
if(inputStream==null){
- inputStream= new FileInputStream(logFile);
+ inputStream= new PositionInputStream(new BufferedInputStream(new FileInputStream(logFile)));
LOG.debug("Created new input stream " + logFile);
- ia = BinaryInputArchive.getArchive(new BufferedInputStream(inputStream));
+ ia = BinaryInputArchive.getArchive(inputStream);
inStreamCreated(ia,inputStream);
LOG.debug("created new input archive " + logFile);
}
Modified: hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=807486&r1=807485&r2=807486&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
(original)
+++ hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
Tue Aug 25 05:47:20 2009
@@ -200,7 +200,7 @@
readPacket(qp);
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
- LOG.info("Getting a diff from the leader!");
+ LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));
zk.loadData();
}
else if (qp.getType() == Leader.SNAP) {
Modified: hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java?rev=807486&r1=807485&r2=807486&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
(original)
+++ hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
Tue Aug 25 05:47:20 2009
@@ -62,7 +62,7 @@
long getSid(){
return sid;
- }
+ }
/**
* The packets to be sent to the follower
@@ -224,15 +224,23 @@
} else {
this.sid = leader.followerCounter.getAndDecrement();
}
- LOG.info("The follower sid: " + this.sid);
+ LOG.info("Follower sid: " + this.sid + " : info : "
+ + leader.self.quorumPeers.get(this.sid));
+ /* this is the last zxid from the follower but the leader might have to
+ restart the follower from a different zxid depending on truncate and diff.
*/
long peerLastZxid = qp.getZxid();
-
+ /* the default to send to the follower */
int packetToSend = Leader.SNAP;
boolean logTxns = true;
-
long zxidToSend = 0;
- // we are sending the diff
+
+ /** the packets that the follower needs to get updates from **/
+ long updates = peerLastZxid;
+
+ /* we are sending the diff check if we have proposals in memory to be able to
+ * send a diff to the
+ */
synchronized(leader.zk.committedLog) {
if (leader.zk.committedLog.size() != 0) {
if ((leader.zk.maxCommittedLog >= peerLastZxid)
@@ -252,16 +260,7 @@
}
else {
logTxns = false;
- } }
- long leaderLastZxid = leader.startForwarding(this, peerLastZxid);
- QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
- leaderLastZxid, null, null);
- oa.writeRecord(newLeaderQP, "packet");
- bufferedOutput.flush();
- // a special case when both the ids are the same
- if (peerLastZxid == leaderLastZxid) {
- packetToSend = Leader.DIFF;
- zxidToSend = leaderLastZxid;
+ }
}
//check if we decided to send a diff or we need to send a truncate
// we avoid using epochs for truncating because epochs make things
@@ -274,11 +273,31 @@
// we can ask the follower to truncate the log
packetToSend = Leader.TRUNC;
zxidToSend = leader.zk.maxCommittedLog;
-
+ updates = zxidToSend;
}
+
+ /* see what other packets from the proposal
+ * and tobeapplied queues need to be sent
+ * and then decide if we can just send a DIFF
+ * or we actually need to send the whole snapshot
+ */
+ long leaderLastZxid = leader.startForwarding(this, updates);
+ // a special case when both the ids are the same
+ if (peerLastZxid == leaderLastZxid) {
+ packetToSend = Leader.DIFF;
+ zxidToSend = leaderLastZxid;
+ }
+
+ QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+ leaderLastZxid, null, null);
+ oa.writeRecord(newLeaderQP, "packet");
+ bufferedOutput.flush();
+
+
oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
bufferedOutput.flush();
- // only if we are not truncating or fast sycning
+
+ /* if we are not truncating or sending a diff just send a snapshot */
if (packetToSend == Leader.SNAP) {
LOG.warn("Sending snapshot last zxid of peer is 0x"
+ Long.toHexString(peerLastZxid) + " "
@@ -289,7 +308,7 @@
oa.writeString("BenWasHere", "signature");
}
bufferedOutput.flush();
- //
+
// Mutation packets will be queued during the serialize,
// so we need to mark when the follower can actually start
// using the data
Modified: hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=807486&r1=807485&r2=807486&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
(original)
+++ hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
Tue Aug 25 05:47:20 2009
@@ -647,8 +647,7 @@
}
handler.queuePacket(p.packet);
// Since the proposal has been committed we need to send the
- // commit message
- // also
+ // commit message also
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, p.packet
.getZxid(), null, null);
handler.queuePacket(qp);
Modified: hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java?rev=807486&r1=807485&r2=807486&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
(original)
+++ hadoop/zookeeper/branches/branch-3.2/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
Tue Aug 25 05:47:20 2009
@@ -43,13 +43,33 @@
try {
follower.writePacket(qp, false);
} catch (IOException e) {
- LOG.warn("Ignoring unexpected exception during packet send", e);
+ LOG.warn("Closing connection to leader, exception during packet send", e);
+ try {
+ if (!follower.sock.isClosed()) {
+ follower.sock.close();
+ }
+ } catch (IOException e1) {
+ // Nothing to do, we are shutting things down, so an exception here is
irrelevant
+ LOG.debug("Ignoring error closing the connection", e1);
+ }
}
}
}
public void flush() throws IOException {
- follower.writePacket(null, true);
+ try {
+ follower.writePacket(null, true);
+ } catch(IOException e) {
+ LOG.warn("Closing connection to leader, exception during packet send", e);
+ try {
+ if (!follower.sock.isClosed()) {
+ follower.sock.close();
+ }
+ } catch (IOException e1) {
+ // Nothing to do, we are shutting things down, so an exception here is
irrelevant
+ LOG.debug("Ignoring error closing the connection", e1);
+ }
+ }
}
public void shutdown() {
Modified: hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=807486&r1=807485&r2=807486&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/ClientBase.java
(original)
+++ hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/ClientBase.java
Tue Aug 25 05:47:20 2009
@@ -247,7 +247,7 @@
File tmpFile = File.createTempFile("test", ".junit", parentDir);
// don't delete tmpFile - this ensures we don't attempt to create
// a tmpDir with a duplicate name
-
+ tmpFile.delete();
File tmpDir = new File(tmpFile + ".dir");
assertFalse(tmpDir.exists()); // never true if tmpfile does it's job
assertTrue(tmpDir.mkdirs());
@@ -356,7 +356,7 @@
return JMXEnv.conn();
}
- private static boolean recursiveDelete(File d) {
+ public static boolean recursiveDelete(File d) {
if (d.isDirectory()) {
File children[] = d.listFiles();
for (File f : children) {
Modified: hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=807486&r1=807485&r2=807486&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/QuorumTest.java
(original)
+++ hadoop/zookeeper/branches/branch-3.2/src/java/test/org/apache/zookeeper/test/QuorumTest.java
Tue Aug 25 05:47:20 2009
@@ -25,12 +25,16 @@
import junit.framework.TestCase;
import org.apache.log4j.Logger;
+import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.FollowerHandler;
+import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.ZooDefs.Ids;
import org.junit.Before;
import org.junit.Test;
@@ -51,7 +55,7 @@
protected void tearDown() throws Exception {
qb.tearDown();
}
-
+
@Test
public void testDeleteWithChildren() throws Exception {
ct.testDeleteWithChildren();
@@ -92,6 +96,7 @@
{
ct.testClientWithWatcherObj();
}
+
@Test
public void testMultipleWatcherObjs() throws IOException,
InterruptedException, KeeperException
@@ -99,6 +104,56 @@
ct.testMutipleWatcherObjs();
}
+ volatile int counter = 0;
+ volatile int errors = 0;
+ @Test
+ public void testLeaderShutdown() throws IOException, InterruptedException, KeeperException
{
+ ZooKeeper zk = new DisconnectableZooKeeper(qb.hostPort, ClientBase.CONNECTION_TIMEOUT,
new Watcher() {
+ public void process(WatchedEvent event) {
+ }});
+ zk.create("/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create("/blah/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ Leader leader = qb.s1.leader;
+ if (leader == null) leader = qb.s2.leader;
+ if (leader == null) leader = qb.s3.leader;
+ if (leader == null) leader = qb.s4.leader;
+ if (leader == null) leader = qb.s5.leader;
+ assertNotNull(leader);
+ for(int i = 0; i < 5000; i++) {
+ zk.setData("/blah/blah", new byte[0], -1, new AsyncCallback.StatCallback() {
+ public void processResult(int rc, String path, Object ctx,
+ Stat stat) {
+ counter++;
+ if (rc != 0) {
+ errors++;
+ }
+ }
+ }, null);
+ }
+ ArrayList<FollowerHandler> fhs = new ArrayList<FollowerHandler>(leader.forwardingFollowers);
+ for(FollowerHandler f: fhs) {
+ f.sock.shutdownInput();
+ }
+ for(int i = 0; i < 5000; i++) {
+ zk.setData("/blah/blah", new byte[0], -1, new AsyncCallback.StatCallback() {
+ public void processResult(int rc, String path, Object ctx,
+ Stat stat) {
+ counter++;
+ if (rc != 0) {
+ errors++;
+ }
+ }
+ }, null);
+ }
+ // check if all the followers are alive
+ assertTrue(qb.s1.isAlive());
+ assertTrue(qb.s2.isAlive());
+ assertTrue(qb.s3.isAlive());
+ assertTrue(qb.s4.isAlive());
+ assertTrue(qb.s5.isAlive());
+ zk.close();
+ }
+
/**
* Make sure that we can change sessions
* from follower to leader.
@@ -171,6 +226,5 @@
}
zk.close();
}
-
- // skip superhammer and clientcleanup as they are too expensive for quorum
+ // skip superhammer and clientcleanup as they are too expensive for quorum
}
|