zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r1238178 - in /zookeeper/branches/branch-3.3: ./ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/quorum/
Date Tue, 31 Jan 2012 06:54:55 GMT
Author: mahadev
Date: Tue Jan 31 06:54:55 2012
New Revision: 1238178

URL: http://svn.apache.org/viewvc?rev=1238178&view=rev
Log:
ZOOKEEPER-1367. Data inconsistencies and unexpired ephemeral nodes after cluster restart.
(Benjamin Reed via mahadev)

Modified:
    zookeeper/branches/branch-3.3/CHANGES.txt
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
    zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java

Modified: zookeeper/branches/branch-3.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/CHANGES.txt?rev=1238178&r1=1238177&r2=1238178&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.3/CHANGES.txt Tue Jan 31 06:54:55 2012
@@ -12,6 +12,9 @@ BUGFIXES:
   ZOOKEEPER-973. bind() could fail on Leader because it does not
   setReuseAddress on its ServerSocket (Harsh J via phunt)
 
+  ZOOKEEPER-1367. Data inconsistencies and unexpired ephemeral nodes 
+  after cluster restart. (Benjamin Reed via mahadev)
+
 Release 3.3.4 - 2011-11-16
 Backward compatible changes:
 

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=1238178&r1=1238177&r2=1238178&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
(original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
Tue Jan 31 06:54:55 2012
@@ -54,6 +54,7 @@ import org.apache.zookeeper.server.NIOSe
 import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
 import org.apache.zookeeper.txn.CreateSessionTxn;
 import org.apache.zookeeper.txn.ErrorTxn;
+import org.apache.zookeeper.txn.TxnHeader;
 
 /**
  * This Request processor actually applies any transaction associated with a
@@ -100,20 +101,10 @@ public class FinalRequestProcessor imple
                 }
             }
             if (request.hdr != null) {
-                rc = zks.getZKDatabase().processTxn(request.hdr, request.txn);
-                if (request.type == OpCode.createSession) {
-                    if (request.txn instanceof CreateSessionTxn) {
-                        CreateSessionTxn cst = (CreateSessionTxn) request.txn;
-                        zks.sessionTracker.addSession(request.sessionId, cst
-                                .getTimeOut());
-                    } else {
-                        LOG.warn("*****>>>>> Got "
-                                + request.txn.getClass() + " "
-                                + request.txn.toString());
-                    }
-                } else if (request.type == OpCode.closeSession) {
-                    zks.sessionTracker.removeSession(request.sessionId);
-                }
+               TxnHeader hdr = request.hdr;
+               Record txn = request.txn;
+
+               rc = zks.processTxn(hdr, txn);
             }
             // do not add non quorum packets to the queue.
             if (Request.isQuorum(request.type)) {

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=1238178&r1=1238177&r2=1238178&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
(original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
Tue Jan 31 06:54:55 2012
@@ -41,9 +41,12 @@ import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.StatPersisted;
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.proto.RequestHeader;
+import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
 import org.apache.zookeeper.server.SessionTracker.Session;
 import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.txn.CreateSessionTxn;
+import org.apache.zookeeper.txn.TxnHeader;
 
 /**
  * This class implements a simple standalone ZooKeeperServer. It sets up the
@@ -93,7 +96,6 @@ public class ZooKeeperServer implements 
     protected int maxSessionTimeout = -1;
     protected SessionTracker sessionTracker;
     private FileTxnSnapLog txnLogFactory = null;
-    private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
     private ZKDatabase zkDb;
     protected long hzxid = 0;
     public final static Exception ok = new Exception("No prob");
@@ -240,8 +242,7 @@ public class ZooKeeperServer implements 
         // Clean up dead sessions
         LinkedList<Long> deadSessions = new LinkedList<Long>();
         for (long session : zkDb.getSessions()) {
-            sessionsWithTimeouts = zkDb.getSessionWithTimeOuts();
-            if (sessionsWithTimeouts.get(session) == null) {
+          if (zkDb.getSessionWithTimeOuts().get(session) == null) {
                 deadSessions.add(session);
             }
         }
@@ -368,7 +369,10 @@ public class ZooKeeperServer implements 
     }
     
     public void startup() {        
-        createSessionTracker();
+        if (sessionTracker == null) {
+            createSessionTracker();
+        }
+        startSessionTracker();
         setupRequestProcessors();
 
         registerJMX();
@@ -391,6 +395,9 @@ public class ZooKeeperServer implements 
     protected void createSessionTracker() {
         sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
                 tickTime, 1);
+    }
+    
+    protected void startSessionTracker() {
         ((SessionTrackerImpl)sessionTracker).start();
     }
 
@@ -697,6 +704,27 @@ public class ZooKeeperServer implements 
     public String getState() {
         return "standalone";
     }
+    
+    public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
+        ProcessTxnResult rc;
+        int opCode = hdr.getType();
+        long sessionId = hdr.getClientId();
+        rc = getZKDatabase().processTxn(hdr, txn);
+        if (opCode == OpCode.createSession) {
+            if (txn instanceof CreateSessionTxn) {
+                CreateSessionTxn cst = (CreateSessionTxn) txn;
+                sessionTracker.addSession(sessionId, cst
+                        .getTimeOut());
+            } else {
+                LOG.warn("*****>>>>> Got "
+                        + txn.getClass() + " "
+                        + txn.toString());
+            }
+        } else if (opCode == OpCode.closeSession) {
+            sessionTracker.removeSession(sessionId);
+        }
+        return rc;
+    }
 
     public void dumpEphemerals(PrintWriter pwriter) {
     	zkDb.dumpEphemerals(pwriter);

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java?rev=1238178&r1=1238177&r2=1238178&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
(original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
Tue Jan 31 06:54:55 2012
@@ -77,9 +77,13 @@ public class LeaderZooKeeperServer exten
     }
     
     @Override
-    protected void createSessionTracker() {
+    public void createSessionTracker() {
         sessionTracker = new SessionTrackerImpl(this, getZKDatabase().getSessionWithTimeOuts(),
                 tickTime, self.getId());
+    }
+    
+    @Override
+    protected void startSessionTracker() {
         ((SessionTrackerImpl)sessionTracker).start();
     }
 

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1238178&r1=1238177&r2=1238178&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
(original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
Tue Jan 31 06:54:55 2012
@@ -321,6 +321,7 @@ public class Learner {       
 
             }
             zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
+            zk.createSessionTracker();
             if(LOG.isInfoEnabled()){
                 LOG.info("Setting leader epoch " + Long.toHexString(newLeaderZxid >>
32L));
             }
@@ -351,7 +352,7 @@ public class Learner {       
                     if (pif.hdr.getZxid() != qp.getZxid()) {
                         LOG.warn("Committing " + qp.getZxid() + ", but next proposal is "
+ pif.hdr.getZxid());
                     } else {
-                        zk.getZKDatabase().processTxn(pif.hdr, pif.rec);
+                        zk.processTxn(pif.hdr, pif.rec);
                         packetsNotCommitted.remove();
                     }
                     break;
@@ -360,7 +361,7 @@ public class Learner {       
                     ia = BinaryInputArchive
                             .getArchive(new ByteArrayInputStream(qp.getData()));
                     Record txn = SerializeUtils.deserializeTxn(ia, hdr);
-                    zk.getZKDatabase().processTxn(hdr, txn);
+                    zk.processTxn(hdr, txn);
                     break;
                 case Leader.UPTODATE:
                     zk.takeSnapshot();

Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java?rev=1238178&r1=1238177&r2=1238178&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
(original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
Tue Jan 31 06:54:55 2012
@@ -70,12 +70,15 @@ public abstract class LearnerZooKeeperSe
     }    
     
     @Override
-    protected void createSessionTracker() {
+    public void createSessionTracker() {
         sessionTracker = new LearnerSessionTracker(this, getZKDatabase().getSessionWithTimeOuts(),
                 self.getId());
     }
     
     @Override
+    protected void startSessionTracker() {}
+    
+    @Override
     protected void revalidateSession(ServerCnxn cnxn, long sessionId,
             int sessionTimeout) throws IOException, InterruptedException {
         getLearner().validateSession(cnxn, sessionId, sessionTimeout);



Mime
View raw message