Author: thawan
Date: Mon Sep 30 20:20:30 2013
New Revision: 1527760
URL: http://svn.apache.org/r1527760
Log:
ZOOKEEPER-1552. Enable sync request processor in Observer (thawan, fpj)
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1527760&r1=1527759&r2=1527760&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Mon Sep 30 20:20:30 2013
@@ -581,6 +581,8 @@ IMPROVEMENTS:
ZOOKEEPER-1759. Adding ability to allow READ operations for authenticated users,
versus keeping ACLs wide open for READ. (Yuliya Feldman via camille)
+ ZOOKEEPER-1552. Enable sync request processor in Observer (thawan, fpj)
+
Release 3.4.0 -
Non-backward compatible changes:
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java?rev=1527760&r1=1527759&r2=1527760&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java Mon
Sep 30 20:20:30 2013
@@ -27,11 +27,22 @@ import java.util.concurrent.LinkedBlocki
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* This RequestProcessor logs requests to disk. It batches the requests to do
* the io efficiently. The request is not passed to the next RequestProcessor
* until its log has been synced to disk.
+ *
+ * SyncRequestProcessor is used in 3 different cases
+ * 1. Leader - Sync request to disk and forward it to AckRequestProcessor which
+ * send ack back to itself.
+ * 2. Follower - Sync request to disk and forward request to
+ * SendAckRequestProcessor which send the packets to leader.
+ * SendAckRequestProcessor is flushable which allow us to force
+ * push packets to leader.
+ * 3. Observer - Sync committed request to disk (received as INFORM packet).
+ * It never send ack back to the leader, so the nextProcessor will
+ * be null. This change the semantic of txnlog on the observer
+ * since it only contains committed txns.
*/
public class SyncRequestProcessor extends Thread implements RequestProcessor {
private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class);
@@ -135,9 +146,11 @@ public class SyncRequestProcessor extend
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
- nextProcessor.processRequest(si);
- if (nextProcessor instanceof Flushable) {
- ((Flushable)nextProcessor).flush();
+ if (nextProcessor != null) {
+ nextProcessor.processRequest(si);
+ if (nextProcessor instanceof Flushable) {
+ ((Flushable)nextProcessor).flush();
+ }
}
continue;
}
@@ -164,9 +177,11 @@ public class SyncRequestProcessor extend
zks.getZKDatabase().commit();
while (!toFlush.isEmpty()) {
Request i = toFlush.remove();
- nextProcessor.processRequest(i);
+ if (nextProcessor != null) {
+ nextProcessor.processRequest(i);
+ }
}
- if (nextProcessor instanceof Flushable) {
+ if (nextProcessor != null && nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
@@ -181,7 +196,9 @@ public class SyncRequestProcessor extend
} catch(InterruptedException e) {
LOG.warn("Interrupted while wating for " + this + " to finish");
}
- nextProcessor.shutdown();
+ if (nextProcessor != null) {
+ nextProcessor.shutdown();
+ }
}
public void processRequest(Request request) {
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java?rev=1527760&r1=1527759&r2=1527760&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
(original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
Mon Sep 30 20:20:30 2013
@@ -38,6 +38,13 @@ public class ObserverZooKeeperServer ext
private static final Logger LOG =
LoggerFactory.getLogger(ObserverZooKeeperServer.class);
+ /**
+ * Enable since request processor for writing txnlog to disk and
+ * take periodic snapshot. Default is ON.
+ */
+
+ private boolean syncRequestProcessorEnabled = this.self.getSyncEnabled();
+
/*
* Request processors
*/
@@ -52,6 +59,7 @@ public class ObserverZooKeeperServer ext
ObserverZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb)
throws IOException {
super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout,
zkDb, self);
+ LOG.info("syncEnabled =" + syncRequestProcessorEnabled);
}
public Observer getObserver() {
@@ -72,6 +80,10 @@ public class ObserverZooKeeperServer ext
* @param request
*/
public void commitRequest(Request request) {
+ if (syncRequestProcessorEnabled) {
+ // Write to txnlog and take periodic snapshot
+ syncProcessor.processRequest(request);
+ }
commitProcessor.commit(request);
}
@@ -90,11 +102,21 @@ public class ObserverZooKeeperServer ext
commitProcessor.start();
firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
((ObserverRequestProcessor) firstProcessor).start();
- syncProcessor = new SyncRequestProcessor(this,
- new SendAckRequestProcessor(getObserver()));
- syncProcessor.start();
+
+ /*
+ * Observer should write to disk, so that the it won't request
+ * too old txn from the leader which may lead to getting an entire
+ * snapshot.
+ *
+ * However, this may degrade performance as it has to write to disk
+ * and do periodic snapshot which may double the memory requirements
+ */
+ if (syncRequestProcessorEnabled) {
+ syncProcessor = new SyncRequestProcessor(this, null);
+ syncProcessor.start();
+ }
}
-
+
/*
* Process a sync request
*/
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1527760&r1=1527759&r2=1527760&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Mon Sep
30 20:20:30 2013
@@ -402,6 +402,12 @@ public class QuorumPeer extends Thread i
* an acknowledgment
*/
protected int syncLimit;
+
+ /**
+ * Enables/Disables sync request processor. This option is enabled
+ * by default and is to be used with observers.
+ */
+ protected boolean syncEnabled;
/**
* The current tick
@@ -1295,6 +1301,35 @@ public class QuorumPeer extends Thread i
public void setSyncLimit(int syncLimit) {
this.syncLimit = syncLimit;
}
+
+
+ /**
+ * The syncEnabled can also be set via a system property.
+ */
+ public static final String SYNC_ENABLED = "zookeeper.observer.syncEnabled";
+
+ /**
+ * Return syncEnabled.
+ *
+ * @return
+ */
+ public boolean getSyncEnabled() {
+ if (System.getProperty(SYNC_ENABLED) != null) {
+ LOG.info(SYNC_ENABLED + "=" + Boolean.getBoolean(SYNC_ENABLED));
+ return Boolean.getBoolean(SYNC_ENABLED);
+ } else {
+ return syncEnabled;
+ }
+ }
+
+ /**
+ * Set syncEnabled.
+ *
+ * @param syncEnabled
+ */
+ public void setSyncEnabled(boolean syncEnabled) {
+ this.syncEnabled = syncEnabled;
+ }
/**
* Gets the election type
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java?rev=1527760&r1=1527759&r2=1527760&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
(original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
Mon Sep 30 20:20:30 2013
@@ -74,6 +74,7 @@ public class QuorumPeerConfig {
protected QuorumVerifier quorumVerifier = null, lastSeenQuorumVerifier = null;
protected int snapRetainCount = 3;
protected int purgeInterval = 0;
+ protected boolean syncEnabled = true;
protected LearnerType peerType = LearnerType.PARTICIPANT;
@@ -222,8 +223,10 @@ public class QuorumPeerConfig {
{
throw new ConfigException("Unrecognised peertype: " + value);
}
+ } else if (key.equals( "syncEnabled" )) {
+ syncEnabled = Boolean.parseBoolean(value);
} else if (key.equals("dynamicConfigFile")){
- dynamicConfigFileStr = value;
+ dynamicConfigFileStr = value;
} else if (key.equals("autopurge.snapRetainCount")) {
snapRetainCount = Integer.parseInt(value);
} else if (key.equals("autopurge.purgeInterval")) {
@@ -513,6 +516,10 @@ public class QuorumPeerConfig {
public int getPurgeInterval() {
return purgeInterval;
}
+
+ public boolean getSyncEnabled() {
+ return syncEnabled;
+ }
public QuorumVerifier getQuorumVerifier() {
return quorumVerifier;
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java?rev=1527760&r1=1527759&r2=1527760&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java Mon
Sep 30 20:20:30 2013
@@ -157,6 +157,7 @@ public class QuorumPeerMain {
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setLearnerType(config.getPeerType());
+ quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
quorumPeer.start();
|