zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shra...@apache.org
Subject svn commit: r1743978 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/
Date Sun, 15 May 2016 21:31:54 GMT
Author: shralex
Date: Sun May 15 21:31:54 2016
New Revision: 1743978

URL: http://svn.apache.org/viewvc?rev=1743978&view=rev
Log:
ZOOKEEPER-2024 Major throughput improvement with mixed workloads (Kfir Lev-Ari via shralex)

Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1743978&r1=1743977&r2=1743978&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Sun May 15 21:31:54 2016
@@ -306,6 +306,8 @@ BUGFIXES:
   (CVE-2014-3488) (Michael Han via phunt)
 
 IMPROVEMENTS:
+  ZOOKEEPER-2024 Major throughput improvement with mixed workloads (Kfir Lev-Ari via shralex)
+
   ZOOKEEPER-1660 Documentation for Dynamic Reconfiguration (Reed Wanderman-Milne via shralex)  
 
   ZOOKEEPER-2017 New tests for reconfig failure cases (Alexander Shraer and

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=1743978&r1=1743977&r2=1743978&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Sun May 15 21:31:54 2016
@@ -18,8 +18,10 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.slf4j.Logger;
@@ -35,7 +37,8 @@ import org.apache.zookeeper.server.ZooKe
  * This RequestProcessor matches the incoming committed requests with the
  * locally submitted requests. The trick is that locally submitted requests that
  * change the state of the system will come back as incoming committed requests,
- * so we need to match them up.
+ * so we need to match them up. Instead of just waiting for the committed requests, 
+ * we process the uncommitted requests that belong to other sessions.
  *
  * The CommitProcessor is multi-threaded. Communication between threads is
  * handled via queues, atomics, and wait/notifyAll synchronized on the
@@ -76,9 +79,9 @@ public class CommitProcessor extends Zoo
         "zookeeper.commitProcessor.shutdownTimeout";
 
     /**
-     * Requests that we are holding until the commit comes in.
+     * Incoming requests.
      */
-    protected final LinkedBlockingQueue<Request> queuedRequests =
+    protected LinkedBlockingQueue<Request> queuedRequests =
         new LinkedBlockingQueue<Request>();
 
     /**
@@ -87,26 +90,31 @@ public class CommitProcessor extends Zoo
     protected final LinkedBlockingQueue<Request> committedRequests =
         new LinkedBlockingQueue<Request>();
 
-    /** Request for which we are currently awaiting a commit */
-    protected final AtomicReference<Request> nextPending =
-        new AtomicReference<Request>();
-    /** Request currently being committed (ie, sent off to next processor) */
-    private final AtomicReference<Request> currentlyCommitting =
-        new AtomicReference<Request>();
+    /**
+     * Requests that we are holding until commit comes in. Keys represent
+     * session ids, each value is a linked list of the session's requests.
+     */
+    protected final HashMap<Long, LinkedList<Request>> pendingRequests =
+            new HashMap<Long, LinkedList<Request>>(10000);
 
     /** The number of requests currently being processed */
-    protected AtomicInteger numRequestsProcessing = new AtomicInteger(0);
+    protected final AtomicInteger numRequestsProcessing = new AtomicInteger(0);
 
     RequestProcessor nextProcessor;
 
+    /** For testing purposes, we use a separated stopping condition for the
+     * outer loop.*/
+    protected volatile boolean stoppedMainLoop = true; 
     protected volatile boolean stopped = true;
     private long workerShutdownTimeoutMS;
     protected WorkerService workerPool;
+    private Object emptyPoolSync = new Object();
 
     /**
-     * This flag indicates whether we need to wait for a response to come back from the
-     * leader or we just let the sync operation flow through like a read. The flag will
-     * be true if the CommitProcessor is in a Leader pipeline.
+     * This flag indicates whether we need to wait for a response to come back
+     * from the leader or we just let the sync operation flow through like a
+     * read. The flag will be true if the CommitProcessor is in a Leader
+     * pipeline.
      */
     boolean matchSyncs;
 
@@ -121,14 +129,6 @@ public class CommitProcessor extends Zoo
         return numRequestsProcessing.get() != 0;
     }
 
-    private boolean isWaitingForCommit() {
-        return nextPending.get() != null;
-    }
-
-    private boolean isProcessingCommit() {
-        return currentlyCommitting.get() != null;
-    }
-
     protected boolean needCommit(Request request) {
         switch (request.type) {
             case OpCode.create:
@@ -153,94 +153,158 @@ public class CommitProcessor extends Zoo
 
     @Override
     public void run() {
-        Request request;
         try {
-            while (!stopped) {
-                synchronized(this) {
-                    while (
-                        !stopped &&
-                        ((queuedRequests.isEmpty() || isWaitingForCommit() || isProcessingCommit()) &&
-                         (committedRequests.isEmpty() || isProcessingRequest()))) {
-                        wait();
+            /*
+             * In each iteration of the following loop we process at most
+             * requestsToProcess requests of queuedRequests. We have to limit
+             * the number of request we poll from queuedRequests, since it is
+             * possible to endlessly poll read requests from queuedRequests, and
+             * that will lead to a starvation of non-local committed requests.
+             */
+            int requestsToProcess = 0;
+            boolean commitIsWaiting = false;
+			do {
+                /*
+                 * Since requests are placed in the queue before being sent to
+                 * the leader, if commitIsWaiting = true, the commit belongs to
+                 * the first update operation in the queuedRequests or to a
+                 * request from a client on another server (i.e., the order of
+                 * the following two lines is important!).
+                 */
+                commitIsWaiting = !committedRequests.isEmpty();
+                requestsToProcess =  queuedRequests.size();
+                // Avoid sync if we have something to do
+                if (requestsToProcess == 0 && !commitIsWaiting){
+                    // Waiting for requests to process
+                    synchronized (this) {
+                        while (!stopped && requestsToProcess == 0
+                                && !commitIsWaiting) {
+                            wait();
+                            commitIsWaiting = !committedRequests.isEmpty();
+                            requestsToProcess = queuedRequests.size();
+                        }
                     }
                 }
-
                 /*
-                 * Processing queuedRequests: Process the next requests until we
-                 * find one for which we need to wait for a commit. We cannot
-                 * process a read request while we are processing write request.
+                 * Processing up to requestsToProcess requests from the incoming
+                 * queue (queuedRequests), possibly less if a committed request
+                 * is present along with a pending local write. After the loop,
+                 * we process one committed request if commitIsWaiting.
                  */
-                while (!stopped && !isWaitingForCommit() &&
-                       !isProcessingCommit() &&
-                       (request = queuedRequests.poll()) != null) {
-                    if (needCommit(request)) {
-                        nextPending.set(request);
-                    } else {
+                Request request = null;
+                while (!stopped && requestsToProcess > 0
+                        && (request = queuedRequests.poll()) != null) {
+                    requestsToProcess--;
+                    if (needCommit(request)
+                            || pendingRequests.containsKey(request.sessionId)) {
+                        // Add request to pending
+                        LinkedList<Request> requests = pendingRequests
+                                .get(request.sessionId);
+                        if (requests == null) {
+                            requests = new LinkedList<Request>();
+                            pendingRequests.put(request.sessionId, requests);
+                        }
+                        requests.addLast(request);
+                    }
+                    else {
                         sendToNextProcessor(request);
                     }
+                    /*
+                     * Stop feeding the pool if there is a local pending update
+                     * and a committed request that is ready. Once we have a
+                     * pending request with a waiting committed request, we know
+                     * we can process the committed one. This is because commits
+                     * for local requests arrive in the order they appeared in
+                     * the queue, so if we have a pending request and a
+                     * committed request, the committed request must be for that
+                     * pending write or for a write originating at a different
+                     * server.
+                     */
+                    if (!pendingRequests.isEmpty() && !committedRequests.isEmpty()){
+                        /*
+                         * We set commitIsWaiting so that we won't check
+                         * committedRequests again.
+                         */
+                        commitIsWaiting = true;
+                        break;
+                    }
                 }
 
-                /*
-                 * Processing committedRequests: check and see if the commit
-                 * came in for the pending request. We can only commit a
-                 * request when there is no other request being processed.
-                 */
-                processCommitted();
-            }
+                // Handle a single committed request
+                if (commitIsWaiting && !stopped){
+                    waitForEmptyPool();
+
+                    if (stopped){
+                        return;
+                    }
+
+                    // Process committed head
+                    if ((request = committedRequests.poll()) == null) {
+                        throw new IOException("Error: committed head is null");
+                    }
+
+                    /*
+                     * Check if request is pending, if so, update it with the
+                     * committed info
+                     */
+                    LinkedList<Request> sessionQueue = pendingRequests
+                            .get(request.sessionId);
+                    if (sessionQueue != null) {
+                        // If session queue != null, then it is also not empty.
+                        Request topPending = sessionQueue.poll();
+                        if (request.cxid != topPending.cxid) {
+                            LOG.error(
+                                    "Got cxid 0x"
+                                            + Long.toHexString(request.cxid)
+                                            + " expected 0x" + Long.toHexString(
+                                                    topPending.cxid)
+                                    + " for client session id "
+                                    + Long.toHexString(request.sessionId));
+                            throw new IOException("Error: unexpected cxid for"
+                                    + "client session");
+                        }
+                        /*
+                         * We want to send our version of the request. the
+                         * pointer to the connection in the request
+                         */
+                        topPending.setHdr(request.getHdr());
+                        topPending.setTxn(request.getTxn());
+                        topPending.zxid = request.zxid;
+                        request = topPending;
+                    }
+
+                    sendToNextProcessor(request);
+
+                    waitForEmptyPool();
+
+                    /*
+                     * Process following reads if any, remove session queue if
+                     * empty.
+                     */
+                    if (sessionQueue != null) {
+                        while (!stopped && !sessionQueue.isEmpty()
+                                && !needCommit(sessionQueue.peek())) {
+                            sendToNextProcessor(sessionQueue.poll());
+                        }
+                        // Remove empty queues
+                        if (sessionQueue.isEmpty()) {
+                            pendingRequests.remove(request.sessionId);
+                        }
+                    }
+                }
+            } while (!stoppedMainLoop);
         } catch (Throwable e) {
             handleException(this.getName(), e);
         }
         LOG.info("CommitProcessor exited loop!");
     }
 
-    /*
-     * Separated this method from the main run loop
-     * for test purposes (ZOOKEEPER-1863)
-     */
-    protected void processCommitted() {
-        Request request;
-
-        if (!stopped && !isProcessingRequest() &&
-                (committedRequests.peek() != null)) {
-
-            /*
-             * ZOOKEEPER-1863: continue only if there is no new request
-             * waiting in queuedRequests or it is waiting for a
-             * commit. 
-             */
-            if ( !isWaitingForCommit() && !queuedRequests.isEmpty()) {
-                return;
-            }
-            request = committedRequests.poll();
-
-            /*
-             * We match with nextPending so that we can move to the
-             * next request when it is committed. We also want to
-             * use nextPending because it has the cnxn member set
-             * properly.
-             */
-            Request pending = nextPending.get();
-            if (pending != null &&
-                pending.sessionId == request.sessionId &&
-                pending.cxid == request.cxid) {
-                // we want to send our version of the request.
-                // the pointer to the connection in the request
-                pending.setHdr(request.getHdr());
-                pending.setTxn(request.getTxn());
-                pending.zxid = request.zxid;
-                // Set currentlyCommitting so we will block until this
-                // completes. Cleared by CommitWorkRequest after
-                // nextProcessor returns.
-                currentlyCommitting.set(pending);
-                nextPending.set(null);
-                sendToNextProcessor(pending);
-            } else {
-                // this request came from someone else so just
-                // send the commit packet
-                currentlyCommitting.set(request);
-                sendToNextProcessor(request);
+    private void waitForEmptyPool() throws InterruptedException {
+        synchronized(emptyPoolSync) {
+            while ((!stopped) && isProcessingRequest()) {
+                emptyPoolSync.wait();
             }
-        }      
+        }
     }
 
     @Override
@@ -259,6 +323,7 @@ public class CommitProcessor extends Zoo
                 "CommitProcWork", numWorkerThreads, true);
         }
         stopped = false;
+        stoppedMainLoop = false;
         super.start();
     }
 
@@ -295,21 +360,8 @@ public class CommitProcessor extends Zoo
             try {
                 nextProcessor.processRequest(request);
             } finally {
-                // If this request is the commit request that was blocking
-                // the processor, clear.
-                currentlyCommitting.compareAndSet(request, null);
-
-                /*
-                 * Decrement outstanding request count. The processor may be
-                 * blocked at the moment because it is waiting for the pipeline
-                 * to drain. In that case, wake it up if there are pending
-                 * requests.
-                 */
-                if (numRequestsProcessing.decrementAndGet() == 0) {
-                    if (!queuedRequests.isEmpty() ||
-                        !committedRequests.isEmpty()) {
-                        wakeup();
-                    }
+                if (numRequestsProcessing.decrementAndGet() == 0){
+                    wakeupOnEmpty();
                 }
             }
         }
@@ -319,6 +371,12 @@ public class CommitProcessor extends Zoo
         notifyAll();
     }
 
+    private void wakeupOnEmpty() {
+        synchronized(emptyPoolSync){
+            emptyPoolSync.notifyAll();
+        }
+    }
+    
     public void commit(Request request) {
         if (stopped || request == null) {
             return;
@@ -327,9 +385,7 @@ public class CommitProcessor extends Zoo
             LOG.debug("Committing request:: " + request);
         }
         committedRequests.add(request);
-        if (!isProcessingCommit()) {
-            wakeup();
-        }
+        wakeup();
     }
 
     public void processRequest(Request request) {
@@ -340,13 +396,13 @@ public class CommitProcessor extends Zoo
             LOG.debug("Processing request:: " + request);
         }
         queuedRequests.add(request);
-        if (!isWaitingForCommit()) {
-            wakeup();
-        }
+        wakeup();
     }
 
     private void halt() {
+        stoppedMainLoop = true;
         stopped = true;
+        wakeupOnEmpty();
         wakeup();
         queuedRequests.clear();
         if (workerPool != null) {

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java?rev=1743978&r1=1743977&r2=1743978&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java Sun May 15 21:31:54 2016
@@ -19,21 +19,27 @@
 package org.apache.zookeeper.server.quorum;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.proto.CreateRequest;
 import org.apache.zookeeper.proto.GetDataRequest;
-import org.apache.zookeeper.proto.SyncRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.WorkerService;
-import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException;
 import org.apache.zookeeper.server.ZooKeeperServerListener;
 import org.junit.After;
 import org.junit.Assert;
@@ -43,14 +49,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CommitProcessorConcurrencyTest extends ZKTestCase {
-    protected static final Logger LOG =
-            LoggerFactory.getLogger(CommitProcessorConcurrencyTest.class);
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(CommitProcessorConcurrencyTest.class);
 
-    Boolean executedFlag = false;
+    BlockingQueue<Request> processedRequests;
     MockCommitProcessor processor;
+    int defaultSizeOfThreadPool = 16;
 
     @Before
     public void setUp() throws Exception {
+        processedRequests = new LinkedBlockingQueue<Request>();
         processor = new MockCommitProcessor();
     }
 
@@ -59,84 +67,311 @@ public class CommitProcessorConcurrencyT
         processor.shutdown();
     }
 
-    class MockCommitProcessor extends CommitProcessor {
-
-        MockCommitProcessor() {
-          super( 
-                  new RequestProcessor() {
-                      public void processRequest(Request request) 
-                              throws RequestProcessorException {
-                          executedFlag = true;
-                      }
-                      public void shutdown(){}
-          },
-          "0",
-          false, new ZooKeeperServerListener(){
+    // This queue is infinite if we use "poll" to get requests, but returns a
+    // finite size when asked.
+    class MockRequestsQueue extends LinkedBlockingQueue<Request> {
+        private static final long serialVersionUID = 1L;
+        int readReqId = 0;
+
+        // Always have a request to return.
+        public Request poll() {
+            readReqId++;
+            try {
+                return newRequest(new GetDataRequest("/", false),
+                        OpCode.getData, readReqId % 50, readReqId);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            ;
+            return null;
+        }
 
-              @Override
-              public void notifyStopping(String errMsg, int exitCode) {
+        // Fixed queue size.
+        public int size() {
+            return 42;
+        }
+    }
 
-              }});
+    class MockCommitProcessor extends CommitProcessor {
+        MockCommitProcessor() {
+            super(new RequestProcessor() {
+                public void processRequest(Request request)
+                        throws RequestProcessorException {
+                    processedRequests.offer(request);
+                }
+
+                public void shutdown() {
+                }
+            }, "0", false, new ZooKeeperServerListener() {
+
+                @Override
+                public void notifyStopping(String threadName, int errorCode) {
+                }
+            });
         }
 
-        public void testStart() {
+        public void initThreads(int poolSize) {
             this.stopped = false;
-            this.workerPool = new WorkerService(
-                    "CommitProcWork", 1, true);
+            this.workerPool = new WorkerService("CommitProcWork", poolSize,
+                    true);
         }
+    }
+
+    private Request newRequest(Record rec, int type, int sessionId, int xid)
+            throws IOException {
+        ByteArrayOutputStream boas = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
+        rec.serialize(boa, "request");
+        ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
+        return new Request(null, sessionId, xid, type, bb, new ArrayList<Id>());
+    }
+
+    /**
+     * We place a read request followed by committed update request of the same
+     * session in queuedRequests. We verify that both requests are processed,
+     * according to the order of the session (first read, then the write).
+     */
+    @Test
+    public void committedAndUncommittedOfTheSameSessionRaceTest()
+            throws Exception {
+        final String path = "/testCvsUCRace";
+
+        Request readReq = newRequest(new GetDataRequest(path, false),
+                OpCode.getData, 0x0, 0);
+        Request writeReq = newRequest(
+                new SetDataRequest(path, new byte[16], -1), OpCode.setData, 0x0,
+                1);
+
+        processor.committedRequests.add(writeReq);
+        processor.queuedRequests.add(readReq);
+        processor.queuedRequests.add(writeReq);
+        processor.initThreads(1);
+
+        processor.stoppedMainLoop = true;
+        processor.run();
+
+        Assert.assertTrue(
+                "Request was not processed " + readReq + " instead "
+                        + processedRequests.peek(),
+                processedRequests.peek() != null
+                        && processedRequests.peek().equals(readReq));
+        processedRequests.poll();
+        Assert.assertTrue(
+                "Request was not processed " + writeReq + " instead "
+                        + processedRequests.peek(),
+                processedRequests.peek() != null
+                        && processedRequests.peek().equals(writeReq));
+    }
 
-        public void addToCommittedRequests(Request req) {
-            this.committedRequests.add(req);
+    /**
+     * Here we create the following requests queue structure: R1_1, W1_2, R1_3,
+     * R2_1, R2_2, W2_3, R2_4, R3_1, R3_2, R3_3, W3_4, R3_5, ... , W5_6, R5_7
+     * i.e., 5 sessions, each has different amount or read requests, followed by
+     * single write and afterwards single read. The idea is to check that all of
+     * the reads that can be processed concurrently do so, and that none of the
+     * uncommited requests, followed by the reads are processed.
+     */
+    @Test
+    public void processAsMuchUncommittedRequestsAsPossibleTest()
+            throws Exception {
+        final String path = "/testAsMuchAsPossible";
+        LinkedList<Request> shouldBeProcessed = new LinkedList<Request>();
+        HashSet<Request> shouldNotBeProcessed = new HashSet<Request>();
+        for (int sessionId = 1; sessionId <= 5; ++sessionId) {
+            for (int readReqId = 1; readReqId <= sessionId; ++readReqId) {
+                Request readReq = newRequest(new GetDataRequest(path, false),
+                        OpCode.getData, sessionId, readReqId);
+                shouldBeProcessed.add(readReq);
+                processor.queuedRequests.add(readReq);
+            }
+            Request writeReq = newRequest(
+                    new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+                    OpCode.create, sessionId, sessionId + 1);
+            Request readReq = newRequest(new GetDataRequest(path, false),
+                    OpCode.getData, sessionId, sessionId + 2);
+            processor.queuedRequests.add(writeReq);
+            processor.queuedRequests.add(readReq);
+            shouldNotBeProcessed.add(writeReq);
+            shouldNotBeProcessed.add(readReq);
         }
+        processor.initThreads(defaultSizeOfThreadPool);
 
-        public void addToNextPending(Request req) {
-            this.nextPending.set(req);
+        processor.stoppedMainLoop = true;
+        processor.run();
+        Thread.sleep(1000);
+        shouldBeProcessed.removeAll(processedRequests);
+        for (Request r : shouldBeProcessed) {
+            LOG.error("Did not process " + r);
         }
+        Assert.assertTrue("Not all requests were processed",
+                shouldBeProcessed.isEmpty());
+        Assert.assertFalse("Processed a wrong request",
+                shouldNotBeProcessed.removeAll(processedRequests));
+    }
 
-        public void addToQueuedRequests(Request req) {
-            //this.numRequestsProcessing.incrementAndGet();
-            this.queuedRequests.add(req);
+    /**
+     * In the following test, we add a write request followed by several read
+     * requests of the same session, and we verify several things - 1. The write
+     * is not processed until commit arrives. 2. Once the write is processed,
+     * all the read requests are processed as well. 3. All read requests are
+     * executed after the write, before any other write, along with new reads.
+     */
+    @Test
+    public void processAllFollowingUncommittedAfterFirstCommitTest()
+            throws Exception {
+        final String path = "/testUncommittedFollowingCommited";
+        HashSet<Request> shouldBeInPending = new HashSet<Request>();
+        HashSet<Request> shouldBeProcessedAfterPending = new HashSet<Request>();
+
+        Request writeReq = newRequest(
+                new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+                OpCode.create, 0x1, 1);
+        processor.queuedRequests.add(writeReq);
+        shouldBeInPending.add(writeReq);
+
+        for (int readReqId = 2; readReqId <= 5; ++readReqId) {
+            Request readReq = newRequest(new GetDataRequest(path, false),
+                    OpCode.getData, 0x1, readReqId);
+            processor.queuedRequests.add(readReq);
+            shouldBeInPending.add(readReq);
+            shouldBeProcessedAfterPending.add(readReq);
         }
+        processor.initThreads(defaultSizeOfThreadPool);
 
-        public void testProcessCommitted() {
-            this.processCommitted();
+        processor.stoppedMainLoop = true;
+        processor.run();
+        Assert.assertTrue("Processed without waiting for commit",
+                processedRequests.isEmpty());
+        Assert.assertTrue("Did not handled all of queuedRequests' requests",
+                processor.queuedRequests.isEmpty());
+
+        shouldBeInPending
+                .removeAll(processor.pendingRequests.get(writeReq.sessionId));
+        for (Request r : shouldBeInPending) {
+            LOG.error("Should be in pending " + r);
         }
+        Assert.assertTrue(
+                "Not all requests moved to pending from queuedRequests",
+                shouldBeInPending.isEmpty());
+
+        processor.committedRequests.add(writeReq);
+        processor.stoppedMainLoop = true;
+        processor.run();
+        processor.initThreads(defaultSizeOfThreadPool);
+
+        Thread.sleep(500);
+        Assert.assertTrue("Did not process committed request",
+                processedRequests.peek() == writeReq);
+        Assert.assertTrue("Did not process following read request",
+                processedRequests.containsAll(shouldBeProcessedAfterPending));
+        Assert.assertTrue("Did not process committed request",
+                processor.committedRequests.isEmpty());
+        Assert.assertTrue("Did not process committed request",
+                processor.pendingRequests.isEmpty());
+    }
 
-        @Override
-        public void shutdown() {
-            this.workerPool.stop();
+    /**
+     * In the following test, we verify that committed requests are processed
+     * even when queuedRequests never gets empty. We add 10 committed request
+     * and use infinite queuedRequests. We verify that the committed request was
+     * processed.
+     */
+    @Test(timeout = 1000)
+    public void noStarvationOfNonLocalCommittedRequestsTest() throws Exception {
+        final String path = "/noStarvationOfCommittedRequests";
+        processor.queuedRequests = new MockRequestsQueue();
+        HashSet<Request> nonLocalCommits = new HashSet<Request>();
+        for (int i = 0; i < 10; i++) {
+            Request nonLocalCommitReq = newRequest(
+                    new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+                    OpCode.create, 51, i + 1);
+            processor.committedRequests.add(nonLocalCommitReq);
+            nonLocalCommits.add(nonLocalCommitReq);
+        }
+        for (int i = 0; i < 10; i++) {
+            processor.initThreads(defaultSizeOfThreadPool);
+            processor.stoppedMainLoop = true;
+            processor.run();
         }
+        Assert.assertTrue("commit request was not processed",
+                processedRequests.containsAll(nonLocalCommits));
     }
 
-    /*
-     * We populate the necessary data structures in the CommitProcessor
-     * instance and run processCommitted
+    /**
+     * In the following test, we verify that committed writes are not causing
+     * reads starvation. We populate the commit processor with the following
+     * order of requests: 1 committed local updated, 1 read request, 100
+     * committed non-local updates. 50 read requests. We verify that after the
+     * first call to processor.run, only the first write is processed, then
+     * after the second call, all reads are processed along with the second
+     * write.
      */
     @Test
-    public void raceTest() 
-    throws Exception {
+    public void noStarvationOfReadRequestsTest() throws Exception {
+        final String path = "/noStarvationOfReadRequests";
 
-       ByteArrayOutputStream boas = new ByteArrayOutputStream();
-       BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
-       GetDataRequest getReq = new GetDataRequest("/testrace", false);
-       getReq.serialize(boa, "request");
-       ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
-       Request readReq = new Request(null, 0x0, 0, OpCode.getData,
-               bb, new ArrayList<Id>());
-
-       boas.reset();
-       SyncRequest syncReq = new SyncRequest("/testrace");
-       syncReq.serialize(boa, "request");
-       bb = ByteBuffer.wrap(boas.toByteArray());
-       Request writeReq = new Request(null, 0x0, 0, OpCode.sync,
-                                 bb, new ArrayList<Id>());
-
-       processor.addToCommittedRequests(writeReq);
-       processor.addToQueuedRequests(readReq);
-       processor.addToQueuedRequests(writeReq);
-
-       processor.testStart();
-       processor.testProcessCommitted();
-       Assert.assertFalse("Next request processor executed", executedFlag);
+        // +1 committed requests (also head of queuedRequests)
+        Request firstCommittedReq = newRequest(
+                new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+                OpCode.create, 0x3, 1);
+        processor.queuedRequests.add(firstCommittedReq);
+        processor.committedRequests.add(firstCommittedReq);
+        HashSet<Request> allReads = new HashSet<Request>();
+
+        // +1 read request to queuedRequests
+        Request firstRead = newRequest(new GetDataRequest(path, false),
+                OpCode.getData, 0x1, 0);
+        allReads.add(firstRead);
+        processor.queuedRequests.add(firstRead);
+
+        // +1 non local commit
+        Request secondCommittedReq = newRequest(
+                new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+                OpCode.create, 0x99, 2);
+        processor.committedRequests.add(secondCommittedReq);
+
+        HashSet<Request> waitingCommittedRequests = new HashSet<Request>();
+        // +99 non local committed requests
+        for (int writeReqId = 3; writeReqId < 102; ++writeReqId) {
+            Request writeReq = newRequest(
+                    new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+                    OpCode.create, 0x8, writeReqId);
+            processor.committedRequests.add(writeReq);
+            waitingCommittedRequests.add(writeReq);
+        }
+
+        // +50 read requests to queuedRequests
+        for (int readReqId = 1; readReqId <= 50; ++readReqId) {
+            Request readReq = newRequest(new GetDataRequest(path, false),
+                    OpCode.getData, 0x5, readReqId);
+            allReads.add(readReq);
+            processor.queuedRequests.add(readReq);
+        }
+
+        processor.initThreads(defaultSizeOfThreadPool);
+
+        processor.stoppedMainLoop = true;
+        processor.run();
+        Assert.assertTrue("Did not process the first write request",
+                processedRequests.contains(firstCommittedReq));
+        for (Request r : allReads) {
+            Assert.assertTrue("Processed read request",
+                    !processedRequests.contains(r));
+        }
+        processor.run();
+        Assert.assertTrue("did not processed all reads",
+                processedRequests.containsAll(allReads));
+        Assert.assertTrue("Did not process the second write request",
+                processedRequests.contains(secondCommittedReq));
+        for (Request r : waitingCommittedRequests) {
+            Assert.assertTrue("Processed additional committed request",
+                    !processedRequests.contains(r));
+        }
     }
 }
\ No newline at end of file

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java?rev=1743978&r1=1743977&r2=1743978&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java Sun May 15 21:31:54 2016
@@ -24,27 +24,24 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.zookeeper.ZKTestCase;
-import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.proto.CreateRequest;
 import org.apache.zookeeper.proto.GetDataRequest;
 import org.apache.zookeeper.server.FinalRequestProcessor;
 import org.apache.zookeeper.server.PrepRequestProcessor;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
-import org.apache.zookeeper.server.SessionTracker;
 import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.quorum.CommitProcessor;
 import org.apache.zookeeper.test.ClientBase;
-
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -74,27 +71,52 @@ public class CommitProcessorTest extends
     protected static final Logger LOG =
         LoggerFactory.getLogger(CommitProcessorTest.class);
 
-
+    // The amount of ms each test case should run
+    static final int TEST_RUN_TIME_IN_MS = 5000;
     private AtomicInteger processedReadRequests = new AtomicInteger(0);
     private AtomicInteger processedWriteRequests = new AtomicInteger(0);
 
+    boolean stopped;
     TestZooKeeperServer zks;
     File tmpDir;
     ArrayList<TestClientThread> testClients =
-        new ArrayList<TestClientThread>();
+            new ArrayList<TestClientThread>();
+    CommitProcessor commitProcessor;
 
-    public void setUp(int numCommitThreads, int numClientThreads)
+    public void setUp(int numCommitThreads, int numClientThreads, int writePercent)
             throws Exception {
+        stopped = false;
         System.setProperty(
             CommitProcessor.ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS,
             Integer.toString(numCommitThreads));
-        System.setProperty("zookeeper.admin.enableServer", "false");
         tmpDir = ClientBase.createTmpDir();
         ClientBase.setupTestEnv();
         zks = new TestZooKeeperServer(tmpDir, tmpDir, 4000);
         zks.startup();
         for(int i=0; i<numClientThreads; ++i) {
-            TestClientThread client = new TestClientThread();
+            TestClientThread client = new TestClientThread(writePercent);
+            testClients.add(client);
+            client.start();
+        }
+    }
+    
+    public void setUp(int numCommitThreads, int numReadOnlyClientThreads, int mixWorkloadClientThreads, int writePercent)
+            throws Exception {
+        stopped = false;
+        System.setProperty(
+            CommitProcessor.ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS,
+            Integer.toString(numCommitThreads));
+        tmpDir = ClientBase.createTmpDir();
+        ClientBase.setupTestEnv();
+        zks = new TestZooKeeperServer(tmpDir, tmpDir, 4000);
+        zks.startup();
+        for(int i=0; i<mixWorkloadClientThreads; ++i) {
+            TestClientThread client = new TestClientThread(writePercent);
+            testClients.add(client);
+            client.start();
+        }
+        for(int i=0; i<numReadOnlyClientThreads; ++i) {
+            TestClientThread client = new TestClientThread(0);
             testClients.add(client);
             client.start();
         }
@@ -103,25 +125,32 @@ public class CommitProcessorTest extends
     @After
     public void tearDown() throws Exception {
         LOG.info("tearDown starting");
+        stopped = true;
+
+        zks.shutdown();
         for(TestClientThread client : testClients) {
             client.interrupt();
             client.join();
         }
-        zks.shutdown();
-
         if (tmpDir != null) {
             Assert.assertTrue("delete " + tmpDir.toString(),
                               ClientBase.recursiveDelete(tmpDir));
         }
+        processedReadRequests.set(0);
+        processedWriteRequests.set(0);
+        testClients.clear();
+        commitProcessor.join();
     }
 
     private class TestClientThread extends Thread {
         long sessionId;
         int cxid;
         int nodeId;
+        int writePercent;
 
-        public TestClientThread() {
+        public TestClientThread(int writePercent) {
             sessionId = zks.getSessionTracker().createSession(5000);
+            this.writePercent = writePercent;
         }
 
         public void sendWriteRequest() throws Exception {
@@ -134,7 +163,8 @@ public class CommitProcessorTest extends
             ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
             Request req = new Request(null, sessionId, ++cxid, OpCode.create,
                                       bb, new ArrayList<Id>());
-            zks.firstProcessor.processRequest(req);
+            zks.getFirstProcessor().processRequest(req);
+
         }
 
         public void sendReadRequest() throws Exception {
@@ -146,20 +176,20 @@ public class CommitProcessorTest extends
             ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
             Request req = new Request(null, sessionId, ++cxid, OpCode.getData,
                                       bb, new ArrayList<Id>());
-            zks.firstProcessor.processRequest(req);
+            zks.getFirstProcessor().processRequest(req);
         }
 
         public void run() {
             Random rand = new Random(Thread.currentThread().getId());
             try {
                 sendWriteRequest();
-                for(int i=0; i<1000; ++i) {
-                    // Do 25% write / 75% read request mix
-                    if (rand.nextInt(100) < 25) {
+                while(!stopped) {
+                    if (rand.nextInt(100) < writePercent) {
                         sendWriteRequest();
                     } else {
                         sendReadRequest();
                     }
+                    Thread.sleep(5 + rand.nextInt(95));
                 }
             } catch (Exception e) {
                 LOG.error("Uncaught exception in test: ", e);
@@ -168,41 +198,85 @@ public class CommitProcessorTest extends
     }
 
     @Test
-    public void testNoCommitWorkers() throws Exception {
-        setUp(0, 10);
-        synchronized(this) {
-            wait(5000);
+    public void testNoCommitWorkersReadOnlyWorkload() throws Exception {
+        int numClients = 10;
+        LOG.info("testNoCommitWorkersReadOnlyWorkload");
+        setUp(0, numClients, 0);
+        synchronized (this) {
+            wait(TEST_RUN_TIME_IN_MS);
         }
+        Assert.assertFalse(fail);
+        Assert.assertTrue("No read requests processed", processedReadRequests.get() > 0);
+        // processedWriteRequests.get() == numClients since each client performs one write at the beginning (creates a znode)
+        Assert.assertTrue("Write requests processed", processedWriteRequests.get() == numClients);
+    }
+    
+    @Test
+    public void testNoCommitWorkersMixedWorkload() throws Exception {
+        int numClients = 10;
+        LOG.info("testNoCommitWorkersMixedWorkload 25w/75r workload test");
+        setUp(0, numClients, 25);
+        synchronized (this) {
+            wait(TEST_RUN_TIME_IN_MS);
+        }
+        Assert.assertFalse(fail);
         checkProcessedRequest();
+    }
+    
+    @Test
+    public void testOneCommitWorkerReadOnlyWorkload() throws Exception {
+        int numClients = 10;
+        LOG.info("testOneCommitWorkerReadOnlyWorkload");
+        setUp(1, numClients, 0);
+        synchronized (this) {
+            wait(TEST_RUN_TIME_IN_MS);
+        }
         Assert.assertFalse(fail);
+        Assert.assertTrue("No read requests processed", processedReadRequests.get() > 0);
+        // processedWriteRequests.get() == numClients since each client performs one write at the beginning (creates a znode)
+        Assert.assertTrue("Write requests processed", processedWriteRequests.get() == numClients);
     }
 
     @Test
-    public void testOneCommitWorker() throws Exception {
-        setUp(1, 10);
+    public void testOneCommitWorkerMixedWorkload() throws Exception {
+        setUp(1, 10, 25);
+        LOG.info("testOneCommitWorkerMixedWorkload 25w/75r workload test");
         synchronized(this) {
-            wait(5000);
+            wait(TEST_RUN_TIME_IN_MS);
         }
+        Assert.assertFalse(fail);
         checkProcessedRequest();
+    }
+    
+    
+    @Test
+    public void testManyCommitWorkersReadOnly() throws Exception {
+        int numClients = 10;
+        LOG.info("testManyCommitWorkersReadOnly");
+        setUp(10, numClients, 0);
+        synchronized(this) {
+            wait(TEST_RUN_TIME_IN_MS);
+        }
         Assert.assertFalse(fail);
+        Assert.assertTrue("No read requests processed", processedReadRequests.get() > 0);
+        // processedWriteRequests.get() == numClients since each client performs one write at the beginning (creates a znode)
+        Assert.assertTrue("Write requests processed", processedWriteRequests.get() == numClients);
     }
-
+    
     @Test
-    public void testManyCommitWorkers() throws Exception {
-        setUp(10, 10);
+    public void testManyCommitWorkersMixedWorkload() throws Exception {
+        setUp(16, 8 , 8, 25);
+        LOG.info("testManyCommitWorkersMixedWorkload 8X0w/100r + 8X25w/75r workload test");
         synchronized(this) {
-            wait(5000);
+            wait(TEST_RUN_TIME_IN_MS);
         }
-        checkProcessedRequest();
         Assert.assertFalse(fail);
-
+        checkProcessedRequest();
     }
 
     private void checkProcessedRequest() {
-        Assert.assertTrue("No read requests processed",
-                processedReadRequests.get() > 0);
-        Assert.assertTrue("No write requests processed",
-                processedWriteRequests.get() > 0);
+        Assert.assertTrue("No read requests processed", processedReadRequests.get() > 0);
+        Assert.assertTrue("No write requests processed", processedWriteRequests.get() > 0);
     }
 
     volatile boolean fail = false;
@@ -213,16 +287,13 @@ public class CommitProcessorTest extends
     }
 
     private class TestZooKeeperServer extends ZooKeeperServer {
-        PrepRequestProcessor firstProcessor;
-        CommitProcessor commitProcessor;
-
         public TestZooKeeperServer(File snapDir, File logDir, int tickTime)
                 throws IOException {
             super(snapDir, logDir, tickTime);
         }
 
-        public SessionTracker getSessionTracker() {
-            return sessionTracker;
+        public PrepRequestProcessor getFirstProcessor(){
+            return (PrepRequestProcessor) firstProcessor;
         }
 
         // Leader mock: Prep -> MockProposal -> Commit -> validate -> Final
@@ -234,18 +305,17 @@ public class CommitProcessorTest extends
             // processor, so it can do pre/post validating of requests
             ValidateProcessor validateProcessor =
                 new ValidateProcessor(finalProcessor);
-            commitProcessor = new CommitProcessor(validateProcessor, "1", true,
-                    getZooKeeperServerListener());
+            commitProcessor = new CommitProcessor(validateProcessor, "1", true, null);
             validateProcessor.setCommitProcessor(commitProcessor);
             commitProcessor.start();
             MockProposalRequestProcessor proposalProcessor =
                 new MockProposalRequestProcessor(commitProcessor);
             proposalProcessor.start();
             firstProcessor = new PrepRequestProcessor(zks, proposalProcessor);
-            firstProcessor.start();
+            getFirstProcessor().start();
         }
     }
-
+    
     private class MockProposalRequestProcessor extends Thread
             implements RequestProcessor {
         private final CommitProcessor commitProcessor;
@@ -261,9 +331,12 @@ public class CommitProcessorTest extends
             Random rand = new Random(Thread.currentThread().getId());
             try {
                 while(true) {
-                    Request request = proposals.take();
-                    Thread.sleep(10 + rand.nextInt(190));
-                    commitProcessor.commit(request);
+                    // If it is a read-only test, there will be no proposals..
+                    if (!proposals.isEmpty()){
+                        Request request = proposals.take();
+                        Thread.sleep(5 + rand.nextInt(95));
+                        commitProcessor.commit(request);
+                    }
                 }
             } catch (InterruptedException e) {
                 // ignore
@@ -282,11 +355,15 @@ public class CommitProcessorTest extends
 
         @Override
         public void shutdown() {
-            // TODO Auto-generated method stub
-
+            LOG.info("shutdown MockProposalRequestProcessor");
+            proposals.clear();
+            if (commitProcessor != null) {
+                commitProcessor.shutdown();
+            }
         }
     }
 
+    
     private class ValidateProcessor implements RequestProcessor {
         Random rand = new Random(Thread.currentThread().getId());
         RequestProcessor nextProcessor;
@@ -310,6 +387,14 @@ public class CommitProcessorTest extends
         @Override
         public void processRequest(Request request)
                 throws RequestProcessorException {
+            if (stopped)
+                return;
+            if (request.type == OpCode.closeSession){
+                LOG.debug("ValidateProcessor got closeSession request=" + request);
+                nextProcessor.processRequest(request);
+                return;
+            }
+
             boolean isWriteRequest = commitProcessor.needCommit(request);
             if (isWriteRequest) {
                 outstandingWriteRequests.incrementAndGet();
@@ -322,15 +407,14 @@ public class CommitProcessorTest extends
                 outstandingReadRequests.incrementAndGet();
                 validateReadRequestVariant(request);
             }
-
+            
             // Insert random delay to test thread race conditions
             try {
-                Thread.sleep(10 + rand.nextInt(290));
+                Thread.sleep(5 + rand.nextInt(25));
             } catch(InterruptedException e) {
                 // ignore
             }
             nextProcessor.processRequest(request);
-
             /*
              * The commit workers will have to execute this line before they
              * wake up the commit processor. So this value is up-to-date when
@@ -354,6 +438,8 @@ public class CommitProcessorTest extends
          * Validate that this is the only request in the pipeline
          */
         private void validateWriteRequestVariant(Request request) {
+            if (stopped)
+                return;
             long zxid = request.getHdr().getZxid();
             int readRequests = outstandingReadRequests.get();
             if (readRequests != 0) {
@@ -384,7 +470,9 @@ public class CommitProcessorTest extends
         }
 
         private void validateRequest(Request request) {
-            LOG.info("Got request " + request);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Got request " + request);
+            }
 
             // Zxids should always be in order for write requests
             if (request.getHdr() != null) {
@@ -418,8 +506,12 @@ public class CommitProcessorTest extends
 
         @Override
         public void shutdown() {
-            // TODO Auto-generated method stub
+            LOG.info("shutdown validateReadRequestVariant");
+            cxidMap.clear();
+            expectedZxid = new AtomicLong(1);
+            if (nextProcessor!=null){
+                nextProcessor.shutdown();
+            }
         }
     }
-
-}
+}
\ No newline at end of file



Mime
View raw message