hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r683707 - in /hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server: ./ quorum/
Date Thu, 07 Aug 2008 20:34:41 GMT
Author: mahadev
Date: Thu Aug  7 13:34:41 2008
New Revision: 683707

URL: http://svn.apache.org/viewvc?rev=683707&view=rev
Log:
ZOOKEEPER-108. Fix sync operation reordering on a Quorum. (Flavio Paiva Junqueira via mahadev)

Modified:
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=683707&r1=683706&r2=683707&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
Thu Aug  7 13:34:41 2008
@@ -158,6 +158,7 @@
                 err = rc.err;
                 break;
             case OpCode.sync:
+                LOG.debug("OpCode.sync " + request);
                 SyncRequest syncRequest = new SyncRequest();
                 ZooKeeperServer.byteBuffer2Record(request.request,
                         syncRequest);

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=683707&r1=683706&r2=683707&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
Thu Aug  7 13:34:41 2008
@@ -47,11 +47,6 @@
      */
     LinkedList<Request> committedRequests = new LinkedList<Request>();
 
-    /*
-     * Pending sync requests
-     */
-    LinkedList<Request> pendingSyncs = new LinkedList<Request>();
-
     RequestProcessor nextProcessor;
 
     public CommitProcessor(RequestProcessor nextProcessor) {
@@ -127,7 +122,7 @@
                             break;
                         case OpCode.sync:
                             nextPending = request;
-                            pendingSyncs.add(request);
+                            //pendingSyncs.add(request);
                             break;
                         default:
                             toProcess.add(request);
@@ -149,6 +144,7 @@
                          new Exception("committing a null! "));
                 return;
             }
+            LOG.debug("Committing" + request.cxid);
             committedRequests.add(request);
             notifyAll();
         }

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java?rev=683707&r1=683706&r2=683707&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
Thu Aug  7 13:34:41 2008
@@ -64,10 +64,11 @@
                 // the response
                 nextProcessor.processRequest(request);
                 switch (request.type) {
+                case OpCode.sync:
+                    zks.pendingSyncs.add(request);
                 case OpCode.create:
                 case OpCode.delete:
                 case OpCode.setData:
-                case OpCode.sync:
                 case OpCode.setACL:
                 case OpCode.createSession:
                 case OpCode.closeSession:

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java?rev=683707&r1=683706&r2=683707&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
Thu Aug  7 13:34:41 2008
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.log4j.Logger;
 
@@ -50,6 +51,11 @@
 
     SyncRequestProcessor syncProcessor;
 
+    /*
+     * Pending sync requests
+     */
+    ConcurrentLinkedQueue<Request> pendingSyncs;
+    
     /**
      * @param port
      * @param dataDir
@@ -59,6 +65,7 @@
             QuorumPeer self,DataTreeBuilder treeBuilder) throws IOException {
         super(dataDir, dataLogDir, self.tickTime,treeBuilder);
         this.self = self;
+        this.pendingSyncs = new ConcurrentLinkedQueue<Request>();
     }
 
     public Follower getFollower(){
@@ -131,12 +138,12 @@
     }
     
     public void sync(){
-        if(commitProcessor.pendingSyncs.size() ==0){
+        if(pendingSyncs.size() ==0){
             LOG.warn("Not expecting a sync.");
             return;
         }
                 
-        commitProcessor.commit(commitProcessor.pendingSyncs.remove());
+        commitProcessor.commit(pendingSyncs.remove());
     }
              
          

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=683707&r1=683706&r2=683707&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Thu
Aug  7 13:34:41 2008
@@ -373,6 +373,11 @@
                         }
                         commit(zxid);
                         zk.commitProcessor.commit(p.request);
+                        if(pendingSyncs.containsKey(zxid)){
+                            sendSync(syncHandler.get(pendingSyncs.get(zxid).sessionId), pendingSyncs.get(zxid));
+                            syncHandler.remove(pendingSyncs.get(zxid));
+                            pendingSyncs.remove(zxid);
+                        }
                     }
                 }
                 return;
@@ -462,12 +467,6 @@
         lastCommitted = zxid;
         QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
         sendPacket(qp);
-               
-        if(pendingSyncs.containsKey(zxid)){
-            sendSync(syncHandler.get(pendingSyncs.get(zxid).sessionId), pendingSyncs.get(zxid));
-            syncHandler.remove(pendingSyncs.get(zxid));
-            pendingSyncs.remove(zxid);
-        }
     }
 
     long lastProposed;
@@ -544,8 +543,14 @@
      */
             
     public void sendSync(FollowerHandler f, Request r){
-        QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null);
-        f.queuePacket(qp);
+        if(f != null){
+            QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null);
+            f.queuePacket(qp);
+        }
+        else{
+            LOG.warn("Committing sync: " + r.cxid );
+            zk.commitProcessor.commit(r);
+        }
     }
                 
     /**

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java?rev=683707&r1=683706&r2=683707&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
Thu Aug  7 13:34:41 2008
@@ -48,14 +48,22 @@
         // request.addRQRec(">prop");
                 
         
+        /* In the following IF-THEN-ELSE block, we process syncs on the leader. 
+         * If the sync is coming from a follower, then the follower
+         * handler adds it to syncHandler. Otherwise, if it is a client of
+         * the leader that issued the sync command, then syncHandler won't 
+         * contain the handler. In this case, we add it to syncHandler, and 
+         * call processRequest on the next processor.
+         */
+        
         if(request.type == ZooDefs.OpCode.sync){
-            if(zks.getLeader().syncHandler.containsKey(request.sessionId)){
-                zks.getLeader().processSync(request);
-            }
-            else{
+            zks.getLeader().processSync(request);
+
+            if(!zks.getLeader().syncHandler.containsKey(request.sessionId)){
+                zks.getLeader().syncHandler.put(request.sessionId, null);
                 nextProcessor.processRequest(request);
-                zks.commitProcessor.commit(request);
             }
+            
         }
         else{
             nextProcessor.processRequest(request);



Mime
View raw message