Author: shralex
Date: Sun May 15 21:31:54 2016
New Revision: 1743978
URL: http://svn.apache.org/viewvc?rev=1743978&view=rev
Log:
ZOOKEEPER-2024 Major throughput improvement with mixed workloads (Kfir Lev-Ari via shralex)
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1743978&r1=1743977&r2=1743978&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Sun May 15 21:31:54 2016
@@ -306,6 +306,8 @@ BUGFIXES:
(CVE-2014-3488) (Michael Han via phunt)
IMPROVEMENTS:
+ ZOOKEEPER-2024 Major throughput improvement with mixed workloads (Kfir Lev-Ari via shralex)
+
ZOOKEEPER-1660 Documentation for Dynamic Reconfiguration (Reed Wanderman-Milne via shralex)
ZOOKEEPER-2017 New tests for reconfig failure cases (Alexander Shraer and
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=1743978&r1=1743977&r2=1743978&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Sun May 15 21:31:54 2016
@@ -18,8 +18,10 @@
package org.apache.zookeeper.server.quorum;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
@@ -35,7 +37,8 @@ import org.apache.zookeeper.server.ZooKe
* This RequestProcessor matches the incoming committed requests with the
* locally submitted requests. The trick is that locally submitted requests that
* change the state of the system will come back as incoming committed requests,
- * so we need to match them up.
+ * so we need to match them up. Instead of just waiting for the committed requests,
+ * we process the uncommitted requests that belong to other sessions.
*
* The CommitProcessor is multi-threaded. Communication between threads is
* handled via queues, atomics, and wait/notifyAll synchronized on the
@@ -76,9 +79,9 @@ public class CommitProcessor extends Zoo
"zookeeper.commitProcessor.shutdownTimeout";
/**
- * Requests that we are holding until the commit comes in.
+ * Incoming requests.
*/
- protected final LinkedBlockingQueue<Request> queuedRequests =
+ protected LinkedBlockingQueue<Request> queuedRequests =
new LinkedBlockingQueue<Request>();
/**
@@ -87,26 +90,31 @@ public class CommitProcessor extends Zoo
protected final LinkedBlockingQueue<Request> committedRequests =
new LinkedBlockingQueue<Request>();
- /** Request for which we are currently awaiting a commit */
- protected final AtomicReference<Request> nextPending =
- new AtomicReference<Request>();
- /** Request currently being committed (ie, sent off to next processor) */
- private final AtomicReference<Request> currentlyCommitting =
- new AtomicReference<Request>();
+ /**
+ * Requests that we are holding until commit comes in. Keys represent
+ * session ids, each value is a linked list of the session's requests.
+ */
+ protected final HashMap<Long, LinkedList<Request>> pendingRequests =
+ new HashMap<Long, LinkedList<Request>>(10000);
/** The number of requests currently being processed */
- protected AtomicInteger numRequestsProcessing = new AtomicInteger(0);
+ protected final AtomicInteger numRequestsProcessing = new AtomicInteger(0);
RequestProcessor nextProcessor;
+ /** For testing purposes, we use a separated stopping condition for the
+ * outer loop.*/
+ protected volatile boolean stoppedMainLoop = true;
protected volatile boolean stopped = true;
private long workerShutdownTimeoutMS;
protected WorkerService workerPool;
+ private Object emptyPoolSync = new Object();
/**
- * This flag indicates whether we need to wait for a response to come back from the
- * leader or we just let the sync operation flow through like a read. The flag will
- * be true if the CommitProcessor is in a Leader pipeline.
+ * This flag indicates whether we need to wait for a response to come back
+ * from the leader or we just let the sync operation flow through like a
+ * read. The flag will be true if the CommitProcessor is in a Leader
+ * pipeline.
*/
boolean matchSyncs;
@@ -121,14 +129,6 @@ public class CommitProcessor extends Zoo
return numRequestsProcessing.get() != 0;
}
- private boolean isWaitingForCommit() {
- return nextPending.get() != null;
- }
-
- private boolean isProcessingCommit() {
- return currentlyCommitting.get() != null;
- }
-
protected boolean needCommit(Request request) {
switch (request.type) {
case OpCode.create:
@@ -153,94 +153,158 @@ public class CommitProcessor extends Zoo
@Override
public void run() {
- Request request;
try {
- while (!stopped) {
- synchronized(this) {
- while (
- !stopped &&
- ((queuedRequests.isEmpty() || isWaitingForCommit() || isProcessingCommit()) &&
- (committedRequests.isEmpty() || isProcessingRequest()))) {
- wait();
+ /*
+ * In each iteration of the following loop we process at most
+ * requestsToProcess requests of queuedRequests. We have to limit
+ * the number of request we poll from queuedRequests, since it is
+ * possible to endlessly poll read requests from queuedRequests, and
+ * that will lead to a starvation of non-local committed requests.
+ */
+ int requestsToProcess = 0;
+ boolean commitIsWaiting = false;
+ do {
+ /*
+ * Since requests are placed in the queue before being sent to
+ * the leader, if commitIsWaiting = true, the commit belongs to
+ * the first update operation in the queuedRequests or to a
+ * request from a client on another server (i.e., the order of
+ * the following two lines is important!).
+ */
+ commitIsWaiting = !committedRequests.isEmpty();
+ requestsToProcess = queuedRequests.size();
+ // Avoid sync if we have something to do
+ if (requestsToProcess == 0 && !commitIsWaiting){
+ // Waiting for requests to process
+ synchronized (this) {
+ while (!stopped && requestsToProcess == 0
+ && !commitIsWaiting) {
+ wait();
+ commitIsWaiting = !committedRequests.isEmpty();
+ requestsToProcess = queuedRequests.size();
+ }
}
}
-
/*
- * Processing queuedRequests: Process the next requests until we
- * find one for which we need to wait for a commit. We cannot
- * process a read request while we are processing write request.
+ * Processing up to requestsToProcess requests from the incoming
+ * queue (queuedRequests), possibly less if a committed request
+ * is present along with a pending local write. After the loop,
+ * we process one committed request if commitIsWaiting.
*/
- while (!stopped && !isWaitingForCommit() &&
- !isProcessingCommit() &&
- (request = queuedRequests.poll()) != null) {
- if (needCommit(request)) {
- nextPending.set(request);
- } else {
+ Request request = null;
+ while (!stopped && requestsToProcess > 0
+ && (request = queuedRequests.poll()) != null) {
+ requestsToProcess--;
+ if (needCommit(request)
+ || pendingRequests.containsKey(request.sessionId)) {
+ // Add request to pending
+ LinkedList<Request> requests = pendingRequests
+ .get(request.sessionId);
+ if (requests == null) {
+ requests = new LinkedList<Request>();
+ pendingRequests.put(request.sessionId, requests);
+ }
+ requests.addLast(request);
+ }
+ else {
sendToNextProcessor(request);
}
+ /*
+ * Stop feeding the pool if there is a local pending update
+ * and a committed request that is ready. Once we have a
+ * pending request with a waiting committed request, we know
+ * we can process the committed one. This is because commits
+ * for local requests arrive in the order they appeared in
+ * the queue, so if we have a pending request and a
+ * committed request, the committed request must be for that
+ * pending write or for a write originating at a different
+ * server.
+ */
+ if (!pendingRequests.isEmpty() && !committedRequests.isEmpty()){
+ /*
+ * We set commitIsWaiting so that we won't check
+ * committedRequests again.
+ */
+ commitIsWaiting = true;
+ break;
+ }
}
- /*
- * Processing committedRequests: check and see if the commit
- * came in for the pending request. We can only commit a
- * request when there is no other request being processed.
- */
- processCommitted();
- }
+ // Handle a single committed request
+ if (commitIsWaiting && !stopped){
+ waitForEmptyPool();
+
+ if (stopped){
+ return;
+ }
+
+ // Process committed head
+ if ((request = committedRequests.poll()) == null) {
+ throw new IOException("Error: committed head is null");
+ }
+
+ /*
+ * Check if request is pending, if so, update it with the
+ * committed info
+ */
+ LinkedList<Request> sessionQueue = pendingRequests
+ .get(request.sessionId);
+ if (sessionQueue != null) {
+ // If session queue != null, then it is also not empty.
+ Request topPending = sessionQueue.poll();
+ if (request.cxid != topPending.cxid) {
+ LOG.error(
+ "Got cxid 0x"
+ + Long.toHexString(request.cxid)
+ + " expected 0x" + Long.toHexString(
+ topPending.cxid)
+ + " for client session id "
+ + Long.toHexString(request.sessionId));
+ throw new IOException("Error: unexpected cxid for"
+ + "client session");
+ }
+ /*
+ * We want to send our version of the request. the
+ * pointer to the connection in the request
+ */
+ topPending.setHdr(request.getHdr());
+ topPending.setTxn(request.getTxn());
+ topPending.zxid = request.zxid;
+ request = topPending;
+ }
+
+ sendToNextProcessor(request);
+
+ waitForEmptyPool();
+
+ /*
+ * Process following reads if any, remove session queue if
+ * empty.
+ */
+ if (sessionQueue != null) {
+ while (!stopped && !sessionQueue.isEmpty()
+ && !needCommit(sessionQueue.peek())) {
+ sendToNextProcessor(sessionQueue.poll());
+ }
+ // Remove empty queues
+ if (sessionQueue.isEmpty()) {
+ pendingRequests.remove(request.sessionId);
+ }
+ }
+ }
+ } while (!stoppedMainLoop);
} catch (Throwable e) {
handleException(this.getName(), e);
}
LOG.info("CommitProcessor exited loop!");
}
- /*
- * Separated this method from the main run loop
- * for test purposes (ZOOKEEPER-1863)
- */
- protected void processCommitted() {
- Request request;
-
- if (!stopped && !isProcessingRequest() &&
- (committedRequests.peek() != null)) {
-
- /*
- * ZOOKEEPER-1863: continue only if there is no new request
- * waiting in queuedRequests or it is waiting for a
- * commit.
- */
- if ( !isWaitingForCommit() && !queuedRequests.isEmpty()) {
- return;
- }
- request = committedRequests.poll();
-
- /*
- * We match with nextPending so that we can move to the
- * next request when it is committed. We also want to
- * use nextPending because it has the cnxn member set
- * properly.
- */
- Request pending = nextPending.get();
- if (pending != null &&
- pending.sessionId == request.sessionId &&
- pending.cxid == request.cxid) {
- // we want to send our version of the request.
- // the pointer to the connection in the request
- pending.setHdr(request.getHdr());
- pending.setTxn(request.getTxn());
- pending.zxid = request.zxid;
- // Set currentlyCommitting so we will block until this
- // completes. Cleared by CommitWorkRequest after
- // nextProcessor returns.
- currentlyCommitting.set(pending);
- nextPending.set(null);
- sendToNextProcessor(pending);
- } else {
- // this request came from someone else so just
- // send the commit packet
- currentlyCommitting.set(request);
- sendToNextProcessor(request);
+ private void waitForEmptyPool() throws InterruptedException {
+ synchronized(emptyPoolSync) {
+ while ((!stopped) && isProcessingRequest()) {
+ emptyPoolSync.wait();
}
- }
+ }
}
@Override
@@ -259,6 +323,7 @@ public class CommitProcessor extends Zoo
"CommitProcWork", numWorkerThreads, true);
}
stopped = false;
+ stoppedMainLoop = false;
super.start();
}
@@ -295,21 +360,8 @@ public class CommitProcessor extends Zoo
try {
nextProcessor.processRequest(request);
} finally {
- // If this request is the commit request that was blocking
- // the processor, clear.
- currentlyCommitting.compareAndSet(request, null);
-
- /*
- * Decrement outstanding request count. The processor may be
- * blocked at the moment because it is waiting for the pipeline
- * to drain. In that case, wake it up if there are pending
- * requests.
- */
- if (numRequestsProcessing.decrementAndGet() == 0) {
- if (!queuedRequests.isEmpty() ||
- !committedRequests.isEmpty()) {
- wakeup();
- }
+ if (numRequestsProcessing.decrementAndGet() == 0){
+ wakeupOnEmpty();
}
}
}
@@ -319,6 +371,12 @@ public class CommitProcessor extends Zoo
notifyAll();
}
+ private void wakeupOnEmpty() {
+ synchronized(emptyPoolSync){
+ emptyPoolSync.notifyAll();
+ }
+ }
+
public void commit(Request request) {
if (stopped || request == null) {
return;
@@ -327,9 +385,7 @@ public class CommitProcessor extends Zoo
LOG.debug("Committing request:: " + request);
}
committedRequests.add(request);
- if (!isProcessingCommit()) {
- wakeup();
- }
+ wakeup();
}
public void processRequest(Request request) {
@@ -340,13 +396,13 @@ public class CommitProcessor extends Zoo
LOG.debug("Processing request:: " + request);
}
queuedRequests.add(request);
- if (!isWaitingForCommit()) {
- wakeup();
- }
+ wakeup();
}
private void halt() {
+ stoppedMainLoop = true;
stopped = true;
+ wakeupOnEmpty();
wakeup();
queuedRequests.clear();
if (workerPool != null) {
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java?rev=1743978&r1=1743977&r2=1743978&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java Sun May 15 21:31:54 2016
@@ -19,21 +19,27 @@
package org.apache.zookeeper.server.quorum;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.GetDataRequest;
-import org.apache.zookeeper.proto.SyncRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.WorkerService;
-import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException;
import org.apache.zookeeper.server.ZooKeeperServerListener;
import org.junit.After;
import org.junit.Assert;
@@ -43,14 +49,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CommitProcessorConcurrencyTest extends ZKTestCase {
- protected static final Logger LOG =
- LoggerFactory.getLogger(CommitProcessorConcurrencyTest.class);
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(CommitProcessorConcurrencyTest.class);
- Boolean executedFlag = false;
+ BlockingQueue<Request> processedRequests;
MockCommitProcessor processor;
+ int defaultSizeOfThreadPool = 16;
@Before
public void setUp() throws Exception {
+ processedRequests = new LinkedBlockingQueue<Request>();
processor = new MockCommitProcessor();
}
@@ -59,84 +67,311 @@ public class CommitProcessorConcurrencyT
processor.shutdown();
}
- class MockCommitProcessor extends CommitProcessor {
-
- MockCommitProcessor() {
- super(
- new RequestProcessor() {
- public void processRequest(Request request)
- throws RequestProcessorException {
- executedFlag = true;
- }
- public void shutdown(){}
- },
- "0",
- false, new ZooKeeperServerListener(){
+ // This queue is infinite if we use "poll" to get requests, but returns a
+ // finite size when asked.
+ class MockRequestsQueue extends LinkedBlockingQueue<Request> {
+ private static final long serialVersionUID = 1L;
+ int readReqId = 0;
+
+ // Always have a request to return.
+ public Request poll() {
+ readReqId++;
+ try {
+ return newRequest(new GetDataRequest("/", false),
+ OpCode.getData, readReqId % 50, readReqId);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ ;
+ return null;
+ }
- @Override
- public void notifyStopping(String errMsg, int exitCode) {
+ // Fixed queue size.
+ public int size() {
+ return 42;
+ }
+ }
- }});
+ class MockCommitProcessor extends CommitProcessor {
+ MockCommitProcessor() {
+ super(new RequestProcessor() {
+ public void processRequest(Request request)
+ throws RequestProcessorException {
+ processedRequests.offer(request);
+ }
+
+ public void shutdown() {
+ }
+ }, "0", false, new ZooKeeperServerListener() {
+
+ @Override
+ public void notifyStopping(String threadName, int errorCode) {
+ }
+ });
}
- public void testStart() {
+ public void initThreads(int poolSize) {
this.stopped = false;
- this.workerPool = new WorkerService(
- "CommitProcWork", 1, true);
+ this.workerPool = new WorkerService("CommitProcWork", poolSize,
+ true);
}
+ }
+
+ private Request newRequest(Record rec, int type, int sessionId, int xid)
+ throws IOException {
+ ByteArrayOutputStream boas = new ByteArrayOutputStream();
+ BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
+ rec.serialize(boa, "request");
+ ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
+ return new Request(null, sessionId, xid, type, bb, new ArrayList<Id>());
+ }
+
+ /**
+ * We place a read request followed by committed update request of the same
+ * session in queuedRequests. We verify that both requests are processed,
+ * according to the order of the session (first read, then the write).
+ */
+ @Test
+ public void committedAndUncommittedOfTheSameSessionRaceTest()
+ throws Exception {
+ final String path = "/testCvsUCRace";
+
+ Request readReq = newRequest(new GetDataRequest(path, false),
+ OpCode.getData, 0x0, 0);
+ Request writeReq = newRequest(
+ new SetDataRequest(path, new byte[16], -1), OpCode.setData, 0x0,
+ 1);
+
+ processor.committedRequests.add(writeReq);
+ processor.queuedRequests.add(readReq);
+ processor.queuedRequests.add(writeReq);
+ processor.initThreads(1);
+
+ processor.stoppedMainLoop = true;
+ processor.run();
+
+ Assert.assertTrue(
+ "Request was not processed " + readReq + " instead "
+ + processedRequests.peek(),
+ processedRequests.peek() != null
+ && processedRequests.peek().equals(readReq));
+ processedRequests.poll();
+ Assert.assertTrue(
+ "Request was not processed " + writeReq + " instead "
+ + processedRequests.peek(),
+ processedRequests.peek() != null
+ && processedRequests.peek().equals(writeReq));
+ }
- public void addToCommittedRequests(Request req) {
- this.committedRequests.add(req);
+ /**
+ * Here we create the following requests queue structure: R1_1, W1_2, R1_3,
+ * R2_1, R2_2, W2_3, R2_4, R3_1, R3_2, R3_3, W3_4, R3_5, ... , W5_6, R5_7
+ * i.e., 5 sessions, each has different amount or read requests, followed by
+ * single write and afterwards single read. The idea is to check that all of
+ * the reads that can be processed concurrently do so, and that none of the
+ * uncommited requests, followed by the reads are processed.
+ */
+ @Test
+ public void processAsMuchUncommittedRequestsAsPossibleTest()
+ throws Exception {
+ final String path = "/testAsMuchAsPossible";
+ LinkedList<Request> shouldBeProcessed = new LinkedList<Request>();
+ HashSet<Request> shouldNotBeProcessed = new HashSet<Request>();
+ for (int sessionId = 1; sessionId <= 5; ++sessionId) {
+ for (int readReqId = 1; readReqId <= sessionId; ++readReqId) {
+ Request readReq = newRequest(new GetDataRequest(path, false),
+ OpCode.getData, sessionId, readReqId);
+ shouldBeProcessed.add(readReq);
+ processor.queuedRequests.add(readReq);
+ }
+ Request writeReq = newRequest(
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create, sessionId, sessionId + 1);
+ Request readReq = newRequest(new GetDataRequest(path, false),
+ OpCode.getData, sessionId, sessionId + 2);
+ processor.queuedRequests.add(writeReq);
+ processor.queuedRequests.add(readReq);
+ shouldNotBeProcessed.add(writeReq);
+ shouldNotBeProcessed.add(readReq);
}
+ processor.initThreads(defaultSizeOfThreadPool);
- public void addToNextPending(Request req) {
- this.nextPending.set(req);
+ processor.stoppedMainLoop = true;
+ processor.run();
+ Thread.sleep(1000);
+ shouldBeProcessed.removeAll(processedRequests);
+ for (Request r : shouldBeProcessed) {
+ LOG.error("Did not process " + r);
}
+ Assert.assertTrue("Not all requests were processed",
+ shouldBeProcessed.isEmpty());
+ Assert.assertFalse("Processed a wrong request",
+ shouldNotBeProcessed.removeAll(processedRequests));
+ }
- public void addToQueuedRequests(Request req) {
- //this.numRequestsProcessing.incrementAndGet();
- this.queuedRequests.add(req);
+ /**
+ * In the following test, we add a write request followed by several read
+ * requests of the same session, and we verify several things - 1. The write
+ * is not processed until commit arrives. 2. Once the write is processed,
+ * all the read requests are processed as well. 3. All read requests are
+ * executed after the write, before any other write, along with new reads.
+ */
+ @Test
+ public void processAllFollowingUncommittedAfterFirstCommitTest()
+ throws Exception {
+ final String path = "/testUncommittedFollowingCommited";
+ HashSet<Request> shouldBeInPending = new HashSet<Request>();
+ HashSet<Request> shouldBeProcessedAfterPending = new HashSet<Request>();
+
+ Request writeReq = newRequest(
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create, 0x1, 1);
+ processor.queuedRequests.add(writeReq);
+ shouldBeInPending.add(writeReq);
+
+ for (int readReqId = 2; readReqId <= 5; ++readReqId) {
+ Request readReq = newRequest(new GetDataRequest(path, false),
+ OpCode.getData, 0x1, readReqId);
+ processor.queuedRequests.add(readReq);
+ shouldBeInPending.add(readReq);
+ shouldBeProcessedAfterPending.add(readReq);
}
+ processor.initThreads(defaultSizeOfThreadPool);
- public void testProcessCommitted() {
- this.processCommitted();
+ processor.stoppedMainLoop = true;
+ processor.run();
+ Assert.assertTrue("Processed without waiting for commit",
+ processedRequests.isEmpty());
+ Assert.assertTrue("Did not handled all of queuedRequests' requests",
+ processor.queuedRequests.isEmpty());
+
+ shouldBeInPending
+ .removeAll(processor.pendingRequests.get(writeReq.sessionId));
+ for (Request r : shouldBeInPending) {
+ LOG.error("Should be in pending " + r);
}
+ Assert.assertTrue(
+ "Not all requests moved to pending from queuedRequests",
+ shouldBeInPending.isEmpty());
+
+ processor.committedRequests.add(writeReq);
+ processor.stoppedMainLoop = true;
+ processor.run();
+ processor.initThreads(defaultSizeOfThreadPool);
+
+ Thread.sleep(500);
+ Assert.assertTrue("Did not process committed request",
+ processedRequests.peek() == writeReq);
+ Assert.assertTrue("Did not process following read request",
+ processedRequests.containsAll(shouldBeProcessedAfterPending));
+ Assert.assertTrue("Did not process committed request",
+ processor.committedRequests.isEmpty());
+ Assert.assertTrue("Did not process committed request",
+ processor.pendingRequests.isEmpty());
+ }
- @Override
- public void shutdown() {
- this.workerPool.stop();
+ /**
+ * In the following test, we verify that committed requests are processed
+ * even when queuedRequests never gets empty. We add 10 committed request
+ * and use infinite queuedRequests. We verify that the committed request was
+ * processed.
+ */
+ @Test(timeout = 1000)
+ public void noStarvationOfNonLocalCommittedRequestsTest() throws Exception {
+ final String path = "/noStarvationOfCommittedRequests";
+ processor.queuedRequests = new MockRequestsQueue();
+ HashSet<Request> nonLocalCommits = new HashSet<Request>();
+ for (int i = 0; i < 10; i++) {
+ Request nonLocalCommitReq = newRequest(
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create, 51, i + 1);
+ processor.committedRequests.add(nonLocalCommitReq);
+ nonLocalCommits.add(nonLocalCommitReq);
+ }
+ for (int i = 0; i < 10; i++) {
+ processor.initThreads(defaultSizeOfThreadPool);
+ processor.stoppedMainLoop = true;
+ processor.run();
}
+ Assert.assertTrue("commit request was not processed",
+ processedRequests.containsAll(nonLocalCommits));
}
- /*
- * We populate the necessary data structures in the CommitProcessor
- * instance and run processCommitted
+ /**
+ * In the following test, we verify that committed writes are not causing
+ * reads starvation. We populate the commit processor with the following
+ * order of requests: 1 committed local updated, 1 read request, 100
+ * committed non-local updates. 50 read requests. We verify that after the
+ * first call to processor.run, only the first write is processed, then
+ * after the second call, all reads are processed along with the second
+ * write.
*/
@Test
- public void raceTest()
- throws Exception {
+ public void noStarvationOfReadRequestsTest() throws Exception {
+ final String path = "/noStarvationOfReadRequests";
- ByteArrayOutputStream boas = new ByteArrayOutputStream();
- BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
- GetDataRequest getReq = new GetDataRequest("/testrace", false);
- getReq.serialize(boa, "request");
- ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
- Request readReq = new Request(null, 0x0, 0, OpCode.getData,
- bb, new ArrayList<Id>());
-
- boas.reset();
- SyncRequest syncReq = new SyncRequest("/testrace");
- syncReq.serialize(boa, "request");
- bb = ByteBuffer.wrap(boas.toByteArray());
- Request writeReq = new Request(null, 0x0, 0, OpCode.sync,
- bb, new ArrayList<Id>());
-
- processor.addToCommittedRequests(writeReq);
- processor.addToQueuedRequests(readReq);
- processor.addToQueuedRequests(writeReq);
-
- processor.testStart();
- processor.testProcessCommitted();
- Assert.assertFalse("Next request processor executed", executedFlag);
+ // +1 committed requests (also head of queuedRequests)
+ Request firstCommittedReq = newRequest(
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create, 0x3, 1);
+ processor.queuedRequests.add(firstCommittedReq);
+ processor.committedRequests.add(firstCommittedReq);
+ HashSet<Request> allReads = new HashSet<Request>();
+
+ // +1 read request to queuedRequests
+ Request firstRead = newRequest(new GetDataRequest(path, false),
+ OpCode.getData, 0x1, 0);
+ allReads.add(firstRead);
+ processor.queuedRequests.add(firstRead);
+
+ // +1 non local commit
+ Request secondCommittedReq = newRequest(
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create, 0x99, 2);
+ processor.committedRequests.add(secondCommittedReq);
+
+ HashSet<Request> waitingCommittedRequests = new HashSet<Request>();
+ // +99 non local committed requests
+ for (int writeReqId = 3; writeReqId < 102; ++writeReqId) {
+ Request writeReq = newRequest(
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create, 0x8, writeReqId);
+ processor.committedRequests.add(writeReq);
+ waitingCommittedRequests.add(writeReq);
+ }
+
+ // +50 read requests to queuedRequests
+ for (int readReqId = 1; readReqId <= 50; ++readReqId) {
+ Request readReq = newRequest(new GetDataRequest(path, false),
+ OpCode.getData, 0x5, readReqId);
+ allReads.add(readReq);
+ processor.queuedRequests.add(readReq);
+ }
+
+ processor.initThreads(defaultSizeOfThreadPool);
+
+ processor.stoppedMainLoop = true;
+ processor.run();
+ Assert.assertTrue("Did not process the first write request",
+ processedRequests.contains(firstCommittedReq));
+ for (Request r : allReads) {
+ Assert.assertTrue("Processed read request",
+ !processedRequests.contains(r));
+ }
+ processor.run();
+ Assert.assertTrue("did not processed all reads",
+ processedRequests.containsAll(allReads));
+ Assert.assertTrue("Did not process the second write request",
+ processedRequests.contains(secondCommittedReq));
+ for (Request r : waitingCommittedRequests) {
+ Assert.assertTrue("Processed additional committed request",
+ !processedRequests.contains(r));
+ }
}
}
\ No newline at end of file
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java?rev=1743978&r1=1743977&r2=1743978&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java Sun May 15 21:31:54 2016
@@ -24,27 +24,24 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.jute.BinaryOutputArchive;
import org.apache.zookeeper.ZKTestCase;
-import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.GetDataRequest;
import org.apache.zookeeper.server.FinalRequestProcessor;
import org.apache.zookeeper.server.PrepRequestProcessor;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
-import org.apache.zookeeper.server.SessionTracker;
import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.quorum.CommitProcessor;
import org.apache.zookeeper.test.ClientBase;
-
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -74,27 +71,52 @@ public class CommitProcessorTest extends
protected static final Logger LOG =
LoggerFactory.getLogger(CommitProcessorTest.class);
-
+ // The amount of ms each test case should run
+ static final int TEST_RUN_TIME_IN_MS = 5000;
private AtomicInteger processedReadRequests = new AtomicInteger(0);
private AtomicInteger processedWriteRequests = new AtomicInteger(0);
+ boolean stopped;
TestZooKeeperServer zks;
File tmpDir;
ArrayList<TestClientThread> testClients =
- new ArrayList<TestClientThread>();
+ new ArrayList<TestClientThread>();
+ CommitProcessor commitProcessor;
- public void setUp(int numCommitThreads, int numClientThreads)
+ public void setUp(int numCommitThreads, int numClientThreads, int writePercent)
throws Exception {
+ stopped = false;
System.setProperty(
CommitProcessor.ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS,
Integer.toString(numCommitThreads));
- System.setProperty("zookeeper.admin.enableServer", "false");
tmpDir = ClientBase.createTmpDir();
ClientBase.setupTestEnv();
zks = new TestZooKeeperServer(tmpDir, tmpDir, 4000);
zks.startup();
for(int i=0; i<numClientThreads; ++i) {
- TestClientThread client = new TestClientThread();
+ TestClientThread client = new TestClientThread(writePercent);
+ testClients.add(client);
+ client.start();
+ }
+ }
+
+ public void setUp(int numCommitThreads, int numReadOnlyClientThreads, int mixWorkloadClientThreads, int writePercent)
+ throws Exception {
+ stopped = false;
+ System.setProperty(
+ CommitProcessor.ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS,
+ Integer.toString(numCommitThreads));
+ tmpDir = ClientBase.createTmpDir();
+ ClientBase.setupTestEnv();
+ zks = new TestZooKeeperServer(tmpDir, tmpDir, 4000);
+ zks.startup();
+ for(int i=0; i<mixWorkloadClientThreads; ++i) {
+ TestClientThread client = new TestClientThread(writePercent);
+ testClients.add(client);
+ client.start();
+ }
+ for(int i=0; i<numReadOnlyClientThreads; ++i) {
+ TestClientThread client = new TestClientThread(0);
testClients.add(client);
client.start();
}
@@ -103,25 +125,32 @@ public class CommitProcessorTest extends
@After
public void tearDown() throws Exception {
LOG.info("tearDown starting");
+ stopped = true;
+
+ zks.shutdown();
for(TestClientThread client : testClients) {
client.interrupt();
client.join();
}
- zks.shutdown();
-
if (tmpDir != null) {
Assert.assertTrue("delete " + tmpDir.toString(),
ClientBase.recursiveDelete(tmpDir));
}
+ processedReadRequests.set(0);
+ processedWriteRequests.set(0);
+ testClients.clear();
+ commitProcessor.join();
}
private class TestClientThread extends Thread {
long sessionId;
int cxid;
int nodeId;
+ int writePercent;
- public TestClientThread() {
+ public TestClientThread(int writePercent) {
sessionId = zks.getSessionTracker().createSession(5000);
+ this.writePercent = writePercent;
}
public void sendWriteRequest() throws Exception {
@@ -134,7 +163,8 @@ public class CommitProcessorTest extends
ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
Request req = new Request(null, sessionId, ++cxid, OpCode.create,
bb, new ArrayList<Id>());
- zks.firstProcessor.processRequest(req);
+ zks.getFirstProcessor().processRequest(req);
+
}
public void sendReadRequest() throws Exception {
@@ -146,20 +176,20 @@ public class CommitProcessorTest extends
ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
Request req = new Request(null, sessionId, ++cxid, OpCode.getData,
bb, new ArrayList<Id>());
- zks.firstProcessor.processRequest(req);
+ zks.getFirstProcessor().processRequest(req);
}
public void run() {
Random rand = new Random(Thread.currentThread().getId());
try {
sendWriteRequest();
- for(int i=0; i<1000; ++i) {
- // Do 25% write / 75% read request mix
- if (rand.nextInt(100) < 25) {
+ while(!stopped) {
+ if (rand.nextInt(100) < writePercent) {
sendWriteRequest();
} else {
sendReadRequest();
}
+ Thread.sleep(5 + rand.nextInt(95));
}
} catch (Exception e) {
LOG.error("Uncaught exception in test: ", e);
@@ -168,41 +198,85 @@ public class CommitProcessorTest extends
}
@Test
- public void testNoCommitWorkers() throws Exception {
- setUp(0, 10);
- synchronized(this) {
- wait(5000);
+ public void testNoCommitWorkersReadOnlyWorkload() throws Exception {
+ int numClients = 10;
+ LOG.info("testNoCommitWorkersReadOnlyWorkload");
+ setUp(0, numClients, 0);
+ synchronized (this) {
+ wait(TEST_RUN_TIME_IN_MS);
}
+ Assert.assertFalse(fail);
+ Assert.assertTrue("No read requests processed", processedReadRequests.get() > 0);
+ // processedWriteRequests.get() == numClients since each client performs one write at the beginning (creates a znode)
+ Assert.assertTrue("Write requests processed", processedWriteRequests.get() == numClients);
+ }
+
+ @Test
+ public void testNoCommitWorkersMixedWorkload() throws Exception {
+ int numClients = 10;
+ LOG.info("testNoCommitWorkersMixedWorkload 25w/75r workload test");
+ setUp(0, numClients, 25);
+ synchronized (this) {
+ wait(TEST_RUN_TIME_IN_MS);
+ }
+ Assert.assertFalse(fail);
checkProcessedRequest();
+ }
+
+ @Test
+ public void testOneCommitWorkerReadOnlyWorkload() throws Exception {
+ int numClients = 10;
+ LOG.info("testOneCommitWorkerReadOnlyWorkload");
+ setUp(1, numClients, 0);
+ synchronized (this) {
+ wait(TEST_RUN_TIME_IN_MS);
+ }
Assert.assertFalse(fail);
+ Assert.assertTrue("No read requests processed", processedReadRequests.get() > 0);
+ // processedWriteRequests.get() == numClients since each client performs one write at the beginning (creates a znode)
+ Assert.assertTrue("Write requests processed", processedWriteRequests.get() == numClients);
}
@Test
- public void testOneCommitWorker() throws Exception {
- setUp(1, 10);
+ public void testOneCommitWorkerMixedWorkload() throws Exception {
+ setUp(1, 10, 25);
+ LOG.info("testOneCommitWorkerMixedWorkload 25w/75r workload test");
synchronized(this) {
- wait(5000);
+ wait(TEST_RUN_TIME_IN_MS);
}
+ Assert.assertFalse(fail);
checkProcessedRequest();
+ }
+
+
+ @Test
+ public void testManyCommitWorkersReadOnly() throws Exception {
+ int numClients = 10;
+ LOG.info("testManyCommitWorkersReadOnly");
+ setUp(10, numClients, 0);
+ synchronized(this) {
+ wait(TEST_RUN_TIME_IN_MS);
+ }
Assert.assertFalse(fail);
+ Assert.assertTrue("No read requests processed", processedReadRequests.get() > 0);
+ // processedWriteRequests.get() == numClients since each client performs one write at the beginning (creates a znode)
+ Assert.assertTrue("Write requests processed", processedWriteRequests.get() == numClients);
}
-
+
@Test
- public void testManyCommitWorkers() throws Exception {
- setUp(10, 10);
+ public void testManyCommitWorkersMixedWorkload() throws Exception {
+ setUp(16, 8 , 8, 25);
+ LOG.info("testManyCommitWorkersMixedWorkload 8X0w/100r + 8X25w/75r workload test");
synchronized(this) {
- wait(5000);
+ wait(TEST_RUN_TIME_IN_MS);
}
- checkProcessedRequest();
Assert.assertFalse(fail);
-
+ checkProcessedRequest();
}
private void checkProcessedRequest() {
- Assert.assertTrue("No read requests processed",
- processedReadRequests.get() > 0);
- Assert.assertTrue("No write requests processed",
- processedWriteRequests.get() > 0);
+ Assert.assertTrue("No read requests processed", processedReadRequests.get() > 0);
+ Assert.assertTrue("No write requests processed", processedWriteRequests.get() > 0);
}
volatile boolean fail = false;
@@ -213,16 +287,13 @@ public class CommitProcessorTest extends
}
private class TestZooKeeperServer extends ZooKeeperServer {
- PrepRequestProcessor firstProcessor;
- CommitProcessor commitProcessor;
-
public TestZooKeeperServer(File snapDir, File logDir, int tickTime)
throws IOException {
super(snapDir, logDir, tickTime);
}
- public SessionTracker getSessionTracker() {
- return sessionTracker;
+ public PrepRequestProcessor getFirstProcessor(){
+ return (PrepRequestProcessor) firstProcessor;
}
// Leader mock: Prep -> MockProposal -> Commit -> validate -> Final
@@ -234,18 +305,17 @@ public class CommitProcessorTest extends
// processor, so it can do pre/post validating of requests
ValidateProcessor validateProcessor =
new ValidateProcessor(finalProcessor);
- commitProcessor = new CommitProcessor(validateProcessor, "1", true,
- getZooKeeperServerListener());
+ commitProcessor = new CommitProcessor(validateProcessor, "1", true, null);
validateProcessor.setCommitProcessor(commitProcessor);
commitProcessor.start();
MockProposalRequestProcessor proposalProcessor =
new MockProposalRequestProcessor(commitProcessor);
proposalProcessor.start();
firstProcessor = new PrepRequestProcessor(zks, proposalProcessor);
- firstProcessor.start();
+ getFirstProcessor().start();
}
}
-
+
private class MockProposalRequestProcessor extends Thread
implements RequestProcessor {
private final CommitProcessor commitProcessor;
@@ -261,9 +331,12 @@ public class CommitProcessorTest extends
Random rand = new Random(Thread.currentThread().getId());
try {
while(true) {
- Request request = proposals.take();
- Thread.sleep(10 + rand.nextInt(190));
- commitProcessor.commit(request);
+ // If it is a read-only test, there will be no proposals..
+ if (!proposals.isEmpty()){
+ Request request = proposals.take();
+ Thread.sleep(5 + rand.nextInt(95));
+ commitProcessor.commit(request);
+ }
}
} catch (InterruptedException e) {
// ignore
@@ -282,11 +355,15 @@ public class CommitProcessorTest extends
@Override
public void shutdown() {
- // TODO Auto-generated method stub
-
+ LOG.info("shutdown MockProposalRequestProcessor");
+ proposals.clear();
+ if (commitProcessor != null) {
+ commitProcessor.shutdown();
+ }
}
}
+
private class ValidateProcessor implements RequestProcessor {
Random rand = new Random(Thread.currentThread().getId());
RequestProcessor nextProcessor;
@@ -310,6 +387,14 @@ public class CommitProcessorTest extends
@Override
public void processRequest(Request request)
throws RequestProcessorException {
+ if (stopped)
+ return;
+ if (request.type == OpCode.closeSession){
+ LOG.debug("ValidateProcessor got closeSession request=" + request);
+ nextProcessor.processRequest(request);
+ return;
+ }
+
boolean isWriteRequest = commitProcessor.needCommit(request);
if (isWriteRequest) {
outstandingWriteRequests.incrementAndGet();
@@ -322,15 +407,14 @@ public class CommitProcessorTest extends
outstandingReadRequests.incrementAndGet();
validateReadRequestVariant(request);
}
-
+
// Insert random delay to test thread race conditions
try {
- Thread.sleep(10 + rand.nextInt(290));
+ Thread.sleep(5 + rand.nextInt(25));
} catch(InterruptedException e) {
// ignore
}
nextProcessor.processRequest(request);
-
/*
* The commit workers will have to execute this line before they
* wake up the commit processor. So this value is up-to-date when
@@ -354,6 +438,8 @@ public class CommitProcessorTest extends
* Validate that this is the only request in the pipeline
*/
private void validateWriteRequestVariant(Request request) {
+ if (stopped)
+ return;
long zxid = request.getHdr().getZxid();
int readRequests = outstandingReadRequests.get();
if (readRequests != 0) {
@@ -384,7 +470,9 @@ public class CommitProcessorTest extends
}
private void validateRequest(Request request) {
- LOG.info("Got request " + request);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got request " + request);
+ }
// Zxids should always be in order for write requests
if (request.getHdr() != null) {
@@ -418,8 +506,12 @@ public class CommitProcessorTest extends
@Override
public void shutdown() {
- // TODO Auto-generated method stub
+ LOG.info("shutdown validateReadRequestVariant");
+ cxidMap.clear();
+ expectedZxid = new AtomicLong(1);
+ if (nextProcessor!=null){
+ nextProcessor.shutdown();
+ }
}
}
-
-}
+}
\ No newline at end of file
|