zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tha...@apache.org
Subject svn commit: r1527760 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/quorum/
Date Mon, 30 Sep 2013 20:20:30 GMT
Author: thawan
Date: Mon Sep 30 20:20:30 2013
New Revision: 1527760

URL: http://svn.apache.org/r1527760
Log:
ZOOKEEPER-1552. Enable sync request processor in Observer (thawan, fpj)

Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1527760&r1=1527759&r2=1527760&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Mon Sep 30 20:20:30 2013
@@ -581,6 +581,8 @@ IMPROVEMENTS:
   ZOOKEEPER-1759. Adding ability to allow READ operations for authenticated users, 
   versus keeping ACLs wide open for READ. (Yuliya Feldman via camille)
 
+  ZOOKEEPER-1552. Enable sync request processor in Observer (thawan, fpj)
+
 Release 3.4.0 - 
 
 Non-backward compatible changes:

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java?rev=1527760&r1=1527759&r2=1527760&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java Mon
Sep 30 20:20:30 2013
@@ -27,11 +27,22 @@ import java.util.concurrent.LinkedBlocki
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * This RequestProcessor logs requests to disk. It batches the requests to do
  * the io efficiently. The request is not passed to the next RequestProcessor
  * until its log has been synced to disk.
+ *
+ * SyncRequestProcessor is used in 3 different cases
+ * 1. Leader - Sync request to disk and forward it to AckRequestProcessor which
+ *             send ack back to itself.
+ * 2. Follower - Sync request to disk and forward request to
+ *             SendAckRequestProcessor which send the packets to leader.
+ *             SendAckRequestProcessor is flushable which allow us to force
+ *             push packets to leader.
+ * 3. Observer - Sync committed request to disk (received as INFORM packet).
+ *             It never send ack back to the leader, so the nextProcessor will
+ *             be null. This change the semantic of txnlog on the observer
+ *             since it only contains committed txns.
  */
 public class SyncRequestProcessor extends Thread implements RequestProcessor {
     private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class);
@@ -135,9 +146,11 @@ public class SyncRequestProcessor extend
                         // iff this is a read, and there are no pending
                         // flushes (writes), then just pass this to the next
                         // processor
-                        nextProcessor.processRequest(si);
-                        if (nextProcessor instanceof Flushable) {
-                            ((Flushable)nextProcessor).flush();
+                        if (nextProcessor != null) {
+                            nextProcessor.processRequest(si);
+                            if (nextProcessor instanceof Flushable) {
+                                ((Flushable)nextProcessor).flush();
+                            }
                         }
                         continue;
                     }
@@ -164,9 +177,11 @@ public class SyncRequestProcessor extend
         zks.getZKDatabase().commit();
         while (!toFlush.isEmpty()) {
             Request i = toFlush.remove();
-            nextProcessor.processRequest(i);
+            if (nextProcessor != null) {
+                nextProcessor.processRequest(i);
+            }
         }
-        if (nextProcessor instanceof Flushable) {
+        if (nextProcessor != null && nextProcessor instanceof Flushable) {
             ((Flushable)nextProcessor).flush();
         }
     }
@@ -181,7 +196,9 @@ public class SyncRequestProcessor extend
         } catch(InterruptedException e) {
             LOG.warn("Interrupted while wating for " + this + " to finish");
         }
-        nextProcessor.shutdown();
+        if (nextProcessor != null) {
+            nextProcessor.shutdown();
+        }
     }
 
     public void processRequest(Request request) {

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java?rev=1527760&r1=1527759&r2=1527760&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
(original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
Mon Sep 30 20:20:30 2013
@@ -38,6 +38,13 @@ public class ObserverZooKeeperServer ext
     private static final Logger LOG =
         LoggerFactory.getLogger(ObserverZooKeeperServer.class);        
     
+    /**
+     * Enable since request processor for writing txnlog to disk and
+     * take periodic snapshot. Default is ON.
+     */
+    
+    private boolean syncRequestProcessorEnabled = this.self.getSyncEnabled();
+    
     /*
      * Request processors
      */
@@ -52,6 +59,7 @@ public class ObserverZooKeeperServer ext
 
     ObserverZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb)
throws IOException {
         super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout,
zkDb, self);
+        LOG.info("syncEnabled =" + syncRequestProcessorEnabled);
     }
     
     public Observer getObserver() {
@@ -72,6 +80,10 @@ public class ObserverZooKeeperServer ext
      * @param request
      */
     public void commitRequest(Request request) {     
+        if (syncRequestProcessorEnabled) {
+            // Write to txnlog and take periodic snapshot
+            syncProcessor.processRequest(request);
+        }
         commitProcessor.commit(request);        
     }
     
@@ -90,11 +102,21 @@ public class ObserverZooKeeperServer ext
         commitProcessor.start();
         firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
         ((ObserverRequestProcessor) firstProcessor).start();
-        syncProcessor = new SyncRequestProcessor(this,
-                new SendAckRequestProcessor(getObserver()));
-        syncProcessor.start();
+
+        /*
+         * Observer should write to disk, so that the it won't request
+         * too old txn from the leader which may lead to getting an entire
+         * snapshot.
+         *
+         * However, this may degrade performance as it has to write to disk
+         * and do periodic snapshot which may double the memory requirements
+         */
+        if (syncRequestProcessorEnabled) {
+            syncProcessor = new SyncRequestProcessor(this, null);
+            syncProcessor.start();
+        }
     }
-    
+
     /*
      * Process a sync request
      */

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1527760&r1=1527759&r2=1527760&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Mon Sep
30 20:20:30 2013
@@ -402,6 +402,12 @@ public class QuorumPeer extends Thread i
      * an acknowledgment
      */
     protected int syncLimit;
+    
+    /**
+     * Enables/Disables sync request processor. This option is enabled
+     * by default and is to be used with observers.
+     */
+    protected boolean syncEnabled;
 
     /**
      * The current tick
@@ -1295,6 +1301,35 @@ public class QuorumPeer extends Thread i
     public void setSyncLimit(int syncLimit) {
         this.syncLimit = syncLimit;
     }
+    
+    
+    /**
+     * The syncEnabled can also be set via a system property.
+     */
+    public static final String SYNC_ENABLED = "zookeeper.observer.syncEnabled";
+    
+    /**
+     * Return syncEnabled.
+     * 
+     * @return
+     */
+    public boolean getSyncEnabled() {
+        if (System.getProperty(SYNC_ENABLED) != null) {
+            LOG.info(SYNC_ENABLED + "=" + Boolean.getBoolean(SYNC_ENABLED));   
+            return Boolean.getBoolean(SYNC_ENABLED);
+        } else {        
+            return syncEnabled;
+        }
+    }
+    
+    /**
+     * Set syncEnabled.
+     * 
+     * @param syncEnabled
+     */
+    public void setSyncEnabled(boolean syncEnabled) {
+        this.syncEnabled = syncEnabled;
+    }
 
     /**
      * Gets the election type

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java?rev=1527760&r1=1527759&r2=1527760&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
(original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
Mon Sep 30 20:20:30 2013
@@ -74,6 +74,7 @@ public class QuorumPeerConfig {
     protected QuorumVerifier quorumVerifier = null, lastSeenQuorumVerifier = null;
     protected int snapRetainCount = 3;
     protected int purgeInterval = 0;
+    protected boolean syncEnabled = true;
 
     protected LearnerType peerType = LearnerType.PARTICIPANT;
 
@@ -222,8 +223,10 @@ public class QuorumPeerConfig {
                 {
                     throw new ConfigException("Unrecognised peertype: " + value);
                 }
+            } else if (key.equals( "syncEnabled" )) {
+                syncEnabled = Boolean.parseBoolean(value);
             } else if (key.equals("dynamicConfigFile")){
-               dynamicConfigFileStr = value;
+                dynamicConfigFileStr = value;
             } else if (key.equals("autopurge.snapRetainCount")) {
                 snapRetainCount = Integer.parseInt(value);
             } else if (key.equals("autopurge.purgeInterval")) {
@@ -513,6 +516,10 @@ public class QuorumPeerConfig {
     public int getPurgeInterval() {
         return purgeInterval;
     }
+    
+    public boolean getSyncEnabled() {
+        return syncEnabled;
+    }
 
     public QuorumVerifier getQuorumVerifier() {   
         return quorumVerifier;

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java?rev=1527760&r1=1527759&r2=1527760&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java Mon
Sep 30 20:20:30 2013
@@ -157,6 +157,7 @@ public class QuorumPeerMain {
           quorumPeer.initConfigInZKDatabase();
           quorumPeer.setCnxnFactory(cnxnFactory);
           quorumPeer.setLearnerType(config.getPeerType());
+          quorumPeer.setSyncEnabled(config.getSyncEnabled());
           quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
           
           quorumPeer.start();



Mime
View raw message