Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7A7B0200827 for ; Sun, 15 May 2016 23:32:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 790C9160968; Sun, 15 May 2016 21:32:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9BBFA1602C0 for ; Sun, 15 May 2016 23:32:05 +0200 (CEST) Received: (qmail 78086 invoked by uid 500); 15 May 2016 21:32:04 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 78075 invoked by uid 99); 15 May 2016 21:32:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 15 May 2016 21:32:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id C2ADC1A126A for ; Sun, 15 May 2016 21:32:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.799 X-Spam-Level: * X-Spam-Status: No, score=1.799 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id zh70llvUv4QP for ; Sun, 15 May 2016 21:31:56 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with ESMTP id A03EF5F282 for ; Sun, 15 May 2016 21:31:55 +0000 (UTC) Received: from svn01-us-west.apache.org (svn.apache.org [10.41.0.6]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id F12C4E006A for ; Sun, 15 May 2016 21:31:54 +0000 (UTC) Received: from svn01-us-west.apache.org (localhost [127.0.0.1]) by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id EE4F93A00EC for ; Sun, 15 May 2016 21:31:54 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1743978 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/ Date: Sun, 15 May 2016 21:31:54 -0000 To: commits@zookeeper.apache.org From: shralex@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20160515213154.EE4F93A00EC@svn01-us-west.apache.org> archived-at: Sun, 15 May 2016 21:32:07 -0000 Author: shralex Date: Sun May 15 21:31:54 2016 New Revision: 1743978 URL: http://svn.apache.org/viewvc?rev=1743978&view=rev Log: ZOOKEEPER-2024 Major throughput improvement with mixed workloads (Kfir Lev-Ari via shralex) Modified: zookeeper/trunk/CHANGES.txt zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java Modified: zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1743978&r1=1743977&r2=1743978&view=diff ============================================================================== --- zookeeper/trunk/CHANGES.txt (original) +++ zookeeper/trunk/CHANGES.txt Sun May 15 21:31:54 2016 @@ -306,6 +306,8 @@ BUGFIXES: (CVE-2014-3488) (Michael Han via phunt) IMPROVEMENTS: + ZOOKEEPER-2024 Major throughput improvement with mixed workloads (Kfir Lev-Ari via shralex) + ZOOKEEPER-1660 Documentation for Dynamic Reconfiguration (Reed Wanderman-Milne via shralex) ZOOKEEPER-2017 New tests for reconfig failure cases (Alexander Shraer and Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=1743978&r1=1743977&r2=1743978&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Sun May 15 21:31:54 2016 @@ -18,8 +18,10 @@ package org.apache.zookeeper.server.quorum; +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; @@ -35,7 +37,8 @@ import org.apache.zookeeper.server.ZooKe * This RequestProcessor matches the incoming committed requests with the * locally submitted requests. The trick is that locally submitted requests that * change the state of the system will come back as incoming committed requests, - * so we need to match them up. + * so we need to match them up. Instead of just waiting for the committed requests, + * we process the uncommitted requests that belong to other sessions. * * The CommitProcessor is multi-threaded. Communication between threads is * handled via queues, atomics, and wait/notifyAll synchronized on the @@ -76,9 +79,9 @@ public class CommitProcessor extends Zoo "zookeeper.commitProcessor.shutdownTimeout"; /** - * Requests that we are holding until the commit comes in. + * Incoming requests. */ - protected final LinkedBlockingQueue queuedRequests = + protected LinkedBlockingQueue queuedRequests = new LinkedBlockingQueue(); /** @@ -87,26 +90,31 @@ public class CommitProcessor extends Zoo protected final LinkedBlockingQueue committedRequests = new LinkedBlockingQueue(); - /** Request for which we are currently awaiting a commit */ - protected final AtomicReference nextPending = - new AtomicReference(); - /** Request currently being committed (ie, sent off to next processor) */ - private final AtomicReference currentlyCommitting = - new AtomicReference(); + /** + * Requests that we are holding until commit comes in. Keys represent + * session ids, each value is a linked list of the session's requests. + */ + protected final HashMap> pendingRequests = + new HashMap>(10000); /** The number of requests currently being processed */ - protected AtomicInteger numRequestsProcessing = new AtomicInteger(0); + protected final AtomicInteger numRequestsProcessing = new AtomicInteger(0); RequestProcessor nextProcessor; + /** For testing purposes, we use a separated stopping condition for the + * outer loop.*/ + protected volatile boolean stoppedMainLoop = true; protected volatile boolean stopped = true; private long workerShutdownTimeoutMS; protected WorkerService workerPool; + private Object emptyPoolSync = new Object(); /** - * This flag indicates whether we need to wait for a response to come back from the - * leader or we just let the sync operation flow through like a read. The flag will - * be true if the CommitProcessor is in a Leader pipeline. + * This flag indicates whether we need to wait for a response to come back + * from the leader or we just let the sync operation flow through like a + * read. The flag will be true if the CommitProcessor is in a Leader + * pipeline. */ boolean matchSyncs; @@ -121,14 +129,6 @@ public class CommitProcessor extends Zoo return numRequestsProcessing.get() != 0; } - private boolean isWaitingForCommit() { - return nextPending.get() != null; - } - - private boolean isProcessingCommit() { - return currentlyCommitting.get() != null; - } - protected boolean needCommit(Request request) { switch (request.type) { case OpCode.create: @@ -153,94 +153,158 @@ public class CommitProcessor extends Zoo @Override public void run() { - Request request; try { - while (!stopped) { - synchronized(this) { - while ( - !stopped && - ((queuedRequests.isEmpty() || isWaitingForCommit() || isProcessingCommit()) && - (committedRequests.isEmpty() || isProcessingRequest()))) { - wait(); + /* + * In each iteration of the following loop we process at most + * requestsToProcess requests of queuedRequests. We have to limit + * the number of request we poll from queuedRequests, since it is + * possible to endlessly poll read requests from queuedRequests, and + * that will lead to a starvation of non-local committed requests. + */ + int requestsToProcess = 0; + boolean commitIsWaiting = false; + do { + /* + * Since requests are placed in the queue before being sent to + * the leader, if commitIsWaiting = true, the commit belongs to + * the first update operation in the queuedRequests or to a + * request from a client on another server (i.e., the order of + * the following two lines is important!). + */ + commitIsWaiting = !committedRequests.isEmpty(); + requestsToProcess = queuedRequests.size(); + // Avoid sync if we have something to do + if (requestsToProcess == 0 && !commitIsWaiting){ + // Waiting for requests to process + synchronized (this) { + while (!stopped && requestsToProcess == 0 + && !commitIsWaiting) { + wait(); + commitIsWaiting = !committedRequests.isEmpty(); + requestsToProcess = queuedRequests.size(); + } } } - /* - * Processing queuedRequests: Process the next requests until we - * find one for which we need to wait for a commit. We cannot - * process a read request while we are processing write request. + * Processing up to requestsToProcess requests from the incoming + * queue (queuedRequests), possibly less if a committed request + * is present along with a pending local write. After the loop, + * we process one committed request if commitIsWaiting. */ - while (!stopped && !isWaitingForCommit() && - !isProcessingCommit() && - (request = queuedRequests.poll()) != null) { - if (needCommit(request)) { - nextPending.set(request); - } else { + Request request = null; + while (!stopped && requestsToProcess > 0 + && (request = queuedRequests.poll()) != null) { + requestsToProcess--; + if (needCommit(request) + || pendingRequests.containsKey(request.sessionId)) { + // Add request to pending + LinkedList requests = pendingRequests + .get(request.sessionId); + if (requests == null) { + requests = new LinkedList(); + pendingRequests.put(request.sessionId, requests); + } + requests.addLast(request); + } + else { sendToNextProcessor(request); } + /* + * Stop feeding the pool if there is a local pending update + * and a committed request that is ready. Once we have a + * pending request with a waiting committed request, we know + * we can process the committed one. This is because commits + * for local requests arrive in the order they appeared in + * the queue, so if we have a pending request and a + * committed request, the committed request must be for that + * pending write or for a write originating at a different + * server. + */ + if (!pendingRequests.isEmpty() && !committedRequests.isEmpty()){ + /* + * We set commitIsWaiting so that we won't check + * committedRequests again. + */ + commitIsWaiting = true; + break; + } } - /* - * Processing committedRequests: check and see if the commit - * came in for the pending request. We can only commit a - * request when there is no other request being processed. - */ - processCommitted(); - } + // Handle a single committed request + if (commitIsWaiting && !stopped){ + waitForEmptyPool(); + + if (stopped){ + return; + } + + // Process committed head + if ((request = committedRequests.poll()) == null) { + throw new IOException("Error: committed head is null"); + } + + /* + * Check if request is pending, if so, update it with the + * committed info + */ + LinkedList sessionQueue = pendingRequests + .get(request.sessionId); + if (sessionQueue != null) { + // If session queue != null, then it is also not empty. + Request topPending = sessionQueue.poll(); + if (request.cxid != topPending.cxid) { + LOG.error( + "Got cxid 0x" + + Long.toHexString(request.cxid) + + " expected 0x" + Long.toHexString( + topPending.cxid) + + " for client session id " + + Long.toHexString(request.sessionId)); + throw new IOException("Error: unexpected cxid for" + + "client session"); + } + /* + * We want to send our version of the request. the + * pointer to the connection in the request + */ + topPending.setHdr(request.getHdr()); + topPending.setTxn(request.getTxn()); + topPending.zxid = request.zxid; + request = topPending; + } + + sendToNextProcessor(request); + + waitForEmptyPool(); + + /* + * Process following reads if any, remove session queue if + * empty. + */ + if (sessionQueue != null) { + while (!stopped && !sessionQueue.isEmpty() + && !needCommit(sessionQueue.peek())) { + sendToNextProcessor(sessionQueue.poll()); + } + // Remove empty queues + if (sessionQueue.isEmpty()) { + pendingRequests.remove(request.sessionId); + } + } + } + } while (!stoppedMainLoop); } catch (Throwable e) { handleException(this.getName(), e); } LOG.info("CommitProcessor exited loop!"); } - /* - * Separated this method from the main run loop - * for test purposes (ZOOKEEPER-1863) - */ - protected void processCommitted() { - Request request; - - if (!stopped && !isProcessingRequest() && - (committedRequests.peek() != null)) { - - /* - * ZOOKEEPER-1863: continue only if there is no new request - * waiting in queuedRequests or it is waiting for a - * commit. - */ - if ( !isWaitingForCommit() && !queuedRequests.isEmpty()) { - return; - } - request = committedRequests.poll(); - - /* - * We match with nextPending so that we can move to the - * next request when it is committed. We also want to - * use nextPending because it has the cnxn member set - * properly. - */ - Request pending = nextPending.get(); - if (pending != null && - pending.sessionId == request.sessionId && - pending.cxid == request.cxid) { - // we want to send our version of the request. - // the pointer to the connection in the request - pending.setHdr(request.getHdr()); - pending.setTxn(request.getTxn()); - pending.zxid = request.zxid; - // Set currentlyCommitting so we will block until this - // completes. Cleared by CommitWorkRequest after - // nextProcessor returns. - currentlyCommitting.set(pending); - nextPending.set(null); - sendToNextProcessor(pending); - } else { - // this request came from someone else so just - // send the commit packet - currentlyCommitting.set(request); - sendToNextProcessor(request); + private void waitForEmptyPool() throws InterruptedException { + synchronized(emptyPoolSync) { + while ((!stopped) && isProcessingRequest()) { + emptyPoolSync.wait(); } - } + } } @Override @@ -259,6 +323,7 @@ public class CommitProcessor extends Zoo "CommitProcWork", numWorkerThreads, true); } stopped = false; + stoppedMainLoop = false; super.start(); } @@ -295,21 +360,8 @@ public class CommitProcessor extends Zoo try { nextProcessor.processRequest(request); } finally { - // If this request is the commit request that was blocking - // the processor, clear. - currentlyCommitting.compareAndSet(request, null); - - /* - * Decrement outstanding request count. The processor may be - * blocked at the moment because it is waiting for the pipeline - * to drain. In that case, wake it up if there are pending - * requests. - */ - if (numRequestsProcessing.decrementAndGet() == 0) { - if (!queuedRequests.isEmpty() || - !committedRequests.isEmpty()) { - wakeup(); - } + if (numRequestsProcessing.decrementAndGet() == 0){ + wakeupOnEmpty(); } } } @@ -319,6 +371,12 @@ public class CommitProcessor extends Zoo notifyAll(); } + private void wakeupOnEmpty() { + synchronized(emptyPoolSync){ + emptyPoolSync.notifyAll(); + } + } + public void commit(Request request) { if (stopped || request == null) { return; @@ -327,9 +385,7 @@ public class CommitProcessor extends Zoo LOG.debug("Committing request:: " + request); } committedRequests.add(request); - if (!isProcessingCommit()) { - wakeup(); - } + wakeup(); } public void processRequest(Request request) { @@ -340,13 +396,13 @@ public class CommitProcessor extends Zoo LOG.debug("Processing request:: " + request); } queuedRequests.add(request); - if (!isWaitingForCommit()) { - wakeup(); - } + wakeup(); } private void halt() { + stoppedMainLoop = true; stopped = true; + wakeupOnEmpty(); wakeup(); queuedRequests.clear(); if (workerPool != null) { Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java?rev=1743978&r1=1743977&r2=1743978&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java Sun May 15 21:31:54 2016 @@ -19,21 +19,27 @@ package org.apache.zookeeper.server.quorum; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.jute.BinaryOutputArchive; +import org.apache.jute.Record; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZKTestCase; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.proto.CreateRequest; import org.apache.zookeeper.proto.GetDataRequest; -import org.apache.zookeeper.proto.SyncRequest; +import org.apache.zookeeper.proto.SetDataRequest; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; import org.apache.zookeeper.server.WorkerService; -import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException; import org.apache.zookeeper.server.ZooKeeperServerListener; import org.junit.After; import org.junit.Assert; @@ -43,14 +49,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class CommitProcessorConcurrencyTest extends ZKTestCase { - protected static final Logger LOG = - LoggerFactory.getLogger(CommitProcessorConcurrencyTest.class); + protected static final Logger LOG = LoggerFactory + .getLogger(CommitProcessorConcurrencyTest.class); - Boolean executedFlag = false; + BlockingQueue processedRequests; MockCommitProcessor processor; + int defaultSizeOfThreadPool = 16; @Before public void setUp() throws Exception { + processedRequests = new LinkedBlockingQueue(); processor = new MockCommitProcessor(); } @@ -59,84 +67,311 @@ public class CommitProcessorConcurrencyT processor.shutdown(); } - class MockCommitProcessor extends CommitProcessor { - - MockCommitProcessor() { - super( - new RequestProcessor() { - public void processRequest(Request request) - throws RequestProcessorException { - executedFlag = true; - } - public void shutdown(){} - }, - "0", - false, new ZooKeeperServerListener(){ + // This queue is infinite if we use "poll" to get requests, but returns a + // finite size when asked. + class MockRequestsQueue extends LinkedBlockingQueue { + private static final long serialVersionUID = 1L; + int readReqId = 0; + + // Always have a request to return. + public Request poll() { + readReqId++; + try { + return newRequest(new GetDataRequest("/", false), + OpCode.getData, readReqId % 50, readReqId); + } catch (IOException e) { + e.printStackTrace(); + } + ; + return null; + } - @Override - public void notifyStopping(String errMsg, int exitCode) { + // Fixed queue size. + public int size() { + return 42; + } + } - }}); + class MockCommitProcessor extends CommitProcessor { + MockCommitProcessor() { + super(new RequestProcessor() { + public void processRequest(Request request) + throws RequestProcessorException { + processedRequests.offer(request); + } + + public void shutdown() { + } + }, "0", false, new ZooKeeperServerListener() { + + @Override + public void notifyStopping(String threadName, int errorCode) { + } + }); } - public void testStart() { + public void initThreads(int poolSize) { this.stopped = false; - this.workerPool = new WorkerService( - "CommitProcWork", 1, true); + this.workerPool = new WorkerService("CommitProcWork", poolSize, + true); } + } + + private Request newRequest(Record rec, int type, int sessionId, int xid) + throws IOException { + ByteArrayOutputStream boas = new ByteArrayOutputStream(); + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas); + rec.serialize(boa, "request"); + ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); + return new Request(null, sessionId, xid, type, bb, new ArrayList()); + } + + /** + * We place a read request followed by committed update request of the same + * session in queuedRequests. We verify that both requests are processed, + * according to the order of the session (first read, then the write). + */ + @Test + public void committedAndUncommittedOfTheSameSessionRaceTest() + throws Exception { + final String path = "/testCvsUCRace"; + + Request readReq = newRequest(new GetDataRequest(path, false), + OpCode.getData, 0x0, 0); + Request writeReq = newRequest( + new SetDataRequest(path, new byte[16], -1), OpCode.setData, 0x0, + 1); + + processor.committedRequests.add(writeReq); + processor.queuedRequests.add(readReq); + processor.queuedRequests.add(writeReq); + processor.initThreads(1); + + processor.stoppedMainLoop = true; + processor.run(); + + Assert.assertTrue( + "Request was not processed " + readReq + " instead " + + processedRequests.peek(), + processedRequests.peek() != null + && processedRequests.peek().equals(readReq)); + processedRequests.poll(); + Assert.assertTrue( + "Request was not processed " + writeReq + " instead " + + processedRequests.peek(), + processedRequests.peek() != null + && processedRequests.peek().equals(writeReq)); + } - public void addToCommittedRequests(Request req) { - this.committedRequests.add(req); + /** + * Here we create the following requests queue structure: R1_1, W1_2, R1_3, + * R2_1, R2_2, W2_3, R2_4, R3_1, R3_2, R3_3, W3_4, R3_5, ... , W5_6, R5_7 + * i.e., 5 sessions, each has different amount or read requests, followed by + * single write and afterwards single read. The idea is to check that all of + * the reads that can be processed concurrently do so, and that none of the + * uncommited requests, followed by the reads are processed. + */ + @Test + public void processAsMuchUncommittedRequestsAsPossibleTest() + throws Exception { + final String path = "/testAsMuchAsPossible"; + LinkedList shouldBeProcessed = new LinkedList(); + HashSet shouldNotBeProcessed = new HashSet(); + for (int sessionId = 1; sessionId <= 5; ++sessionId) { + for (int readReqId = 1; readReqId <= sessionId; ++readReqId) { + Request readReq = newRequest(new GetDataRequest(path, false), + OpCode.getData, sessionId, readReqId); + shouldBeProcessed.add(readReq); + processor.queuedRequests.add(readReq); + } + Request writeReq = newRequest( + new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, sessionId, sessionId + 1); + Request readReq = newRequest(new GetDataRequest(path, false), + OpCode.getData, sessionId, sessionId + 2); + processor.queuedRequests.add(writeReq); + processor.queuedRequests.add(readReq); + shouldNotBeProcessed.add(writeReq); + shouldNotBeProcessed.add(readReq); } + processor.initThreads(defaultSizeOfThreadPool); - public void addToNextPending(Request req) { - this.nextPending.set(req); + processor.stoppedMainLoop = true; + processor.run(); + Thread.sleep(1000); + shouldBeProcessed.removeAll(processedRequests); + for (Request r : shouldBeProcessed) { + LOG.error("Did not process " + r); } + Assert.assertTrue("Not all requests were processed", + shouldBeProcessed.isEmpty()); + Assert.assertFalse("Processed a wrong request", + shouldNotBeProcessed.removeAll(processedRequests)); + } - public void addToQueuedRequests(Request req) { - //this.numRequestsProcessing.incrementAndGet(); - this.queuedRequests.add(req); + /** + * In the following test, we add a write request followed by several read + * requests of the same session, and we verify several things - 1. The write + * is not processed until commit arrives. 2. Once the write is processed, + * all the read requests are processed as well. 3. All read requests are + * executed after the write, before any other write, along with new reads. + */ + @Test + public void processAllFollowingUncommittedAfterFirstCommitTest() + throws Exception { + final String path = "/testUncommittedFollowingCommited"; + HashSet shouldBeInPending = new HashSet(); + HashSet shouldBeProcessedAfterPending = new HashSet(); + + Request writeReq = newRequest( + new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, 0x1, 1); + processor.queuedRequests.add(writeReq); + shouldBeInPending.add(writeReq); + + for (int readReqId = 2; readReqId <= 5; ++readReqId) { + Request readReq = newRequest(new GetDataRequest(path, false), + OpCode.getData, 0x1, readReqId); + processor.queuedRequests.add(readReq); + shouldBeInPending.add(readReq); + shouldBeProcessedAfterPending.add(readReq); } + processor.initThreads(defaultSizeOfThreadPool); - public void testProcessCommitted() { - this.processCommitted(); + processor.stoppedMainLoop = true; + processor.run(); + Assert.assertTrue("Processed without waiting for commit", + processedRequests.isEmpty()); + Assert.assertTrue("Did not handled all of queuedRequests' requests", + processor.queuedRequests.isEmpty()); + + shouldBeInPending + .removeAll(processor.pendingRequests.get(writeReq.sessionId)); + for (Request r : shouldBeInPending) { + LOG.error("Should be in pending " + r); } + Assert.assertTrue( + "Not all requests moved to pending from queuedRequests", + shouldBeInPending.isEmpty()); + + processor.committedRequests.add(writeReq); + processor.stoppedMainLoop = true; + processor.run(); + processor.initThreads(defaultSizeOfThreadPool); + + Thread.sleep(500); + Assert.assertTrue("Did not process committed request", + processedRequests.peek() == writeReq); + Assert.assertTrue("Did not process following read request", + processedRequests.containsAll(shouldBeProcessedAfterPending)); + Assert.assertTrue("Did not process committed request", + processor.committedRequests.isEmpty()); + Assert.assertTrue("Did not process committed request", + processor.pendingRequests.isEmpty()); + } - @Override - public void shutdown() { - this.workerPool.stop(); + /** + * In the following test, we verify that committed requests are processed + * even when queuedRequests never gets empty. We add 10 committed request + * and use infinite queuedRequests. We verify that the committed request was + * processed. + */ + @Test(timeout = 1000) + public void noStarvationOfNonLocalCommittedRequestsTest() throws Exception { + final String path = "/noStarvationOfCommittedRequests"; + processor.queuedRequests = new MockRequestsQueue(); + HashSet nonLocalCommits = new HashSet(); + for (int i = 0; i < 10; i++) { + Request nonLocalCommitReq = newRequest( + new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, 51, i + 1); + processor.committedRequests.add(nonLocalCommitReq); + nonLocalCommits.add(nonLocalCommitReq); + } + for (int i = 0; i < 10; i++) { + processor.initThreads(defaultSizeOfThreadPool); + processor.stoppedMainLoop = true; + processor.run(); } + Assert.assertTrue("commit request was not processed", + processedRequests.containsAll(nonLocalCommits)); } - /* - * We populate the necessary data structures in the CommitProcessor - * instance and run processCommitted + /** + * In the following test, we verify that committed writes are not causing + * reads starvation. We populate the commit processor with the following + * order of requests: 1 committed local updated, 1 read request, 100 + * committed non-local updates. 50 read requests. We verify that after the + * first call to processor.run, only the first write is processed, then + * after the second call, all reads are processed along with the second + * write. */ @Test - public void raceTest() - throws Exception { + public void noStarvationOfReadRequestsTest() throws Exception { + final String path = "/noStarvationOfReadRequests"; - ByteArrayOutputStream boas = new ByteArrayOutputStream(); - BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas); - GetDataRequest getReq = new GetDataRequest("/testrace", false); - getReq.serialize(boa, "request"); - ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); - Request readReq = new Request(null, 0x0, 0, OpCode.getData, - bb, new ArrayList()); - - boas.reset(); - SyncRequest syncReq = new SyncRequest("/testrace"); - syncReq.serialize(boa, "request"); - bb = ByteBuffer.wrap(boas.toByteArray()); - Request writeReq = new Request(null, 0x0, 0, OpCode.sync, - bb, new ArrayList()); - - processor.addToCommittedRequests(writeReq); - processor.addToQueuedRequests(readReq); - processor.addToQueuedRequests(writeReq); - - processor.testStart(); - processor.testProcessCommitted(); - Assert.assertFalse("Next request processor executed", executedFlag); + // +1 committed requests (also head of queuedRequests) + Request firstCommittedReq = newRequest( + new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, 0x3, 1); + processor.queuedRequests.add(firstCommittedReq); + processor.committedRequests.add(firstCommittedReq); + HashSet allReads = new HashSet(); + + // +1 read request to queuedRequests + Request firstRead = newRequest(new GetDataRequest(path, false), + OpCode.getData, 0x1, 0); + allReads.add(firstRead); + processor.queuedRequests.add(firstRead); + + // +1 non local commit + Request secondCommittedReq = newRequest( + new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, 0x99, 2); + processor.committedRequests.add(secondCommittedReq); + + HashSet waitingCommittedRequests = new HashSet(); + // +99 non local committed requests + for (int writeReqId = 3; writeReqId < 102; ++writeReqId) { + Request writeReq = newRequest( + new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL.toFlag()), + OpCode.create, 0x8, writeReqId); + processor.committedRequests.add(writeReq); + waitingCommittedRequests.add(writeReq); + } + + // +50 read requests to queuedRequests + for (int readReqId = 1; readReqId <= 50; ++readReqId) { + Request readReq = newRequest(new GetDataRequest(path, false), + OpCode.getData, 0x5, readReqId); + allReads.add(readReq); + processor.queuedRequests.add(readReq); + } + + processor.initThreads(defaultSizeOfThreadPool); + + processor.stoppedMainLoop = true; + processor.run(); + Assert.assertTrue("Did not process the first write request", + processedRequests.contains(firstCommittedReq)); + for (Request r : allReads) { + Assert.assertTrue("Processed read request", + !processedRequests.contains(r)); + } + processor.run(); + Assert.assertTrue("did not processed all reads", + processedRequests.containsAll(allReads)); + Assert.assertTrue("Did not process the second write request", + processedRequests.contains(secondCommittedReq)); + for (Request r : waitingCommittedRequests) { + Assert.assertTrue("Processed additional committed request", + !processedRequests.contains(r)); + } } } \ No newline at end of file Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java?rev=1743978&r1=1743977&r2=1743978&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorTest.java Sun May 15 21:31:54 2016 @@ -24,27 +24,24 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Random; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.jute.BinaryOutputArchive; import org.apache.zookeeper.ZKTestCase; -import org.apache.zookeeper.data.Id; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.data.Id; import org.apache.zookeeper.proto.CreateRequest; import org.apache.zookeeper.proto.GetDataRequest; import org.apache.zookeeper.server.FinalRequestProcessor; import org.apache.zookeeper.server.PrepRequestProcessor; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; -import org.apache.zookeeper.server.SessionTracker; import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.quorum.CommitProcessor; import org.apache.zookeeper.test.ClientBase; - import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -74,27 +71,52 @@ public class CommitProcessorTest extends protected static final Logger LOG = LoggerFactory.getLogger(CommitProcessorTest.class); - + // The amount of ms each test case should run + static final int TEST_RUN_TIME_IN_MS = 5000; private AtomicInteger processedReadRequests = new AtomicInteger(0); private AtomicInteger processedWriteRequests = new AtomicInteger(0); + boolean stopped; TestZooKeeperServer zks; File tmpDir; ArrayList testClients = - new ArrayList(); + new ArrayList(); + CommitProcessor commitProcessor; - public void setUp(int numCommitThreads, int numClientThreads) + public void setUp(int numCommitThreads, int numClientThreads, int writePercent) throws Exception { + stopped = false; System.setProperty( CommitProcessor.ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS, Integer.toString(numCommitThreads)); - System.setProperty("zookeeper.admin.enableServer", "false"); tmpDir = ClientBase.createTmpDir(); ClientBase.setupTestEnv(); zks = new TestZooKeeperServer(tmpDir, tmpDir, 4000); zks.startup(); for(int i=0; i()); - zks.firstProcessor.processRequest(req); + zks.getFirstProcessor().processRequest(req); + } public void sendReadRequest() throws Exception { @@ -146,20 +176,20 @@ public class CommitProcessorTest extends ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); Request req = new Request(null, sessionId, ++cxid, OpCode.getData, bb, new ArrayList()); - zks.firstProcessor.processRequest(req); + zks.getFirstProcessor().processRequest(req); } public void run() { Random rand = new Random(Thread.currentThread().getId()); try { sendWriteRequest(); - for(int i=0; i<1000; ++i) { - // Do 25% write / 75% read request mix - if (rand.nextInt(100) < 25) { + while(!stopped) { + if (rand.nextInt(100) < writePercent) { sendWriteRequest(); } else { sendReadRequest(); } + Thread.sleep(5 + rand.nextInt(95)); } } catch (Exception e) { LOG.error("Uncaught exception in test: ", e); @@ -168,41 +198,85 @@ public class CommitProcessorTest extends } @Test - public void testNoCommitWorkers() throws Exception { - setUp(0, 10); - synchronized(this) { - wait(5000); + public void testNoCommitWorkersReadOnlyWorkload() throws Exception { + int numClients = 10; + LOG.info("testNoCommitWorkersReadOnlyWorkload"); + setUp(0, numClients, 0); + synchronized (this) { + wait(TEST_RUN_TIME_IN_MS); } + Assert.assertFalse(fail); + Assert.assertTrue("No read requests processed", processedReadRequests.get() > 0); + // processedWriteRequests.get() == numClients since each client performs one write at the beginning (creates a znode) + Assert.assertTrue("Write requests processed", processedWriteRequests.get() == numClients); + } + + @Test + public void testNoCommitWorkersMixedWorkload() throws Exception { + int numClients = 10; + LOG.info("testNoCommitWorkersMixedWorkload 25w/75r workload test"); + setUp(0, numClients, 25); + synchronized (this) { + wait(TEST_RUN_TIME_IN_MS); + } + Assert.assertFalse(fail); checkProcessedRequest(); + } + + @Test + public void testOneCommitWorkerReadOnlyWorkload() throws Exception { + int numClients = 10; + LOG.info("testOneCommitWorkerReadOnlyWorkload"); + setUp(1, numClients, 0); + synchronized (this) { + wait(TEST_RUN_TIME_IN_MS); + } Assert.assertFalse(fail); + Assert.assertTrue("No read requests processed", processedReadRequests.get() > 0); + // processedWriteRequests.get() == numClients since each client performs one write at the beginning (creates a znode) + Assert.assertTrue("Write requests processed", processedWriteRequests.get() == numClients); } @Test - public void testOneCommitWorker() throws Exception { - setUp(1, 10); + public void testOneCommitWorkerMixedWorkload() throws Exception { + setUp(1, 10, 25); + LOG.info("testOneCommitWorkerMixedWorkload 25w/75r workload test"); synchronized(this) { - wait(5000); + wait(TEST_RUN_TIME_IN_MS); } + Assert.assertFalse(fail); checkProcessedRequest(); + } + + + @Test + public void testManyCommitWorkersReadOnly() throws Exception { + int numClients = 10; + LOG.info("testManyCommitWorkersReadOnly"); + setUp(10, numClients, 0); + synchronized(this) { + wait(TEST_RUN_TIME_IN_MS); + } Assert.assertFalse(fail); + Assert.assertTrue("No read requests processed", processedReadRequests.get() > 0); + // processedWriteRequests.get() == numClients since each client performs one write at the beginning (creates a znode) + Assert.assertTrue("Write requests processed", processedWriteRequests.get() == numClients); } - + @Test - public void testManyCommitWorkers() throws Exception { - setUp(10, 10); + public void testManyCommitWorkersMixedWorkload() throws Exception { + setUp(16, 8 , 8, 25); + LOG.info("testManyCommitWorkersMixedWorkload 8X0w/100r + 8X25w/75r workload test"); synchronized(this) { - wait(5000); + wait(TEST_RUN_TIME_IN_MS); } - checkProcessedRequest(); Assert.assertFalse(fail); - + checkProcessedRequest(); } private void checkProcessedRequest() { - Assert.assertTrue("No read requests processed", - processedReadRequests.get() > 0); - Assert.assertTrue("No write requests processed", - processedWriteRequests.get() > 0); + Assert.assertTrue("No read requests processed", processedReadRequests.get() > 0); + Assert.assertTrue("No write requests processed", processedWriteRequests.get() > 0); } volatile boolean fail = false; @@ -213,16 +287,13 @@ public class CommitProcessorTest extends } private class TestZooKeeperServer extends ZooKeeperServer { - PrepRequestProcessor firstProcessor; - CommitProcessor commitProcessor; - public TestZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException { super(snapDir, logDir, tickTime); } - public SessionTracker getSessionTracker() { - return sessionTracker; + public PrepRequestProcessor getFirstProcessor(){ + return (PrepRequestProcessor) firstProcessor; } // Leader mock: Prep -> MockProposal -> Commit -> validate -> Final @@ -234,18 +305,17 @@ public class CommitProcessorTest extends // processor, so it can do pre/post validating of requests ValidateProcessor validateProcessor = new ValidateProcessor(finalProcessor); - commitProcessor = new CommitProcessor(validateProcessor, "1", true, - getZooKeeperServerListener()); + commitProcessor = new CommitProcessor(validateProcessor, "1", true, null); validateProcessor.setCommitProcessor(commitProcessor); commitProcessor.start(); MockProposalRequestProcessor proposalProcessor = new MockProposalRequestProcessor(commitProcessor); proposalProcessor.start(); firstProcessor = new PrepRequestProcessor(zks, proposalProcessor); - firstProcessor.start(); + getFirstProcessor().start(); } } - + private class MockProposalRequestProcessor extends Thread implements RequestProcessor { private final CommitProcessor commitProcessor; @@ -261,9 +331,12 @@ public class CommitProcessorTest extends Random rand = new Random(Thread.currentThread().getId()); try { while(true) { - Request request = proposals.take(); - Thread.sleep(10 + rand.nextInt(190)); - commitProcessor.commit(request); + // If it is a read-only test, there will be no proposals.. + if (!proposals.isEmpty()){ + Request request = proposals.take(); + Thread.sleep(5 + rand.nextInt(95)); + commitProcessor.commit(request); + } } } catch (InterruptedException e) { // ignore @@ -282,11 +355,15 @@ public class CommitProcessorTest extends @Override public void shutdown() { - // TODO Auto-generated method stub - + LOG.info("shutdown MockProposalRequestProcessor"); + proposals.clear(); + if (commitProcessor != null) { + commitProcessor.shutdown(); + } } } + private class ValidateProcessor implements RequestProcessor { Random rand = new Random(Thread.currentThread().getId()); RequestProcessor nextProcessor; @@ -310,6 +387,14 @@ public class CommitProcessorTest extends @Override public void processRequest(Request request) throws RequestProcessorException { + if (stopped) + return; + if (request.type == OpCode.closeSession){ + LOG.debug("ValidateProcessor got closeSession request=" + request); + nextProcessor.processRequest(request); + return; + } + boolean isWriteRequest = commitProcessor.needCommit(request); if (isWriteRequest) { outstandingWriteRequests.incrementAndGet(); @@ -322,15 +407,14 @@ public class CommitProcessorTest extends outstandingReadRequests.incrementAndGet(); validateReadRequestVariant(request); } - + // Insert random delay to test thread race conditions try { - Thread.sleep(10 + rand.nextInt(290)); + Thread.sleep(5 + rand.nextInt(25)); } catch(InterruptedException e) { // ignore } nextProcessor.processRequest(request); - /* * The commit workers will have to execute this line before they * wake up the commit processor. So this value is up-to-date when @@ -354,6 +438,8 @@ public class CommitProcessorTest extends * Validate that this is the only request in the pipeline */ private void validateWriteRequestVariant(Request request) { + if (stopped) + return; long zxid = request.getHdr().getZxid(); int readRequests = outstandingReadRequests.get(); if (readRequests != 0) { @@ -384,7 +470,9 @@ public class CommitProcessorTest extends } private void validateRequest(Request request) { - LOG.info("Got request " + request); + if (LOG.isDebugEnabled()) { + LOG.debug("Got request " + request); + } // Zxids should always be in order for write requests if (request.getHdr() != null) { @@ -418,8 +506,12 @@ public class CommitProcessorTest extends @Override public void shutdown() { - // TODO Auto-generated method stub + LOG.info("shutdown validateReadRequestVariant"); + cxidMap.clear(); + expectedZxid = new AtomicLong(1); + if (nextProcessor!=null){ + nextProcessor.shutdown(); + } } } - -} +} \ No newline at end of file