This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 9f13268 Revert "ZOOKEEPER-3598: Fix potential data inconsistency issue due to CommitProcessor
not gracefully shutdown"
9f13268 is described below
commit 9f13268567dfcc26a5f59a7449aeec21fb5e16b3
Author: Andor Molnar <andor@apache.org>
AuthorDate: Fri Nov 15 10:56:42 2019 +0100
Revert "ZOOKEEPER-3598: Fix potential data inconsistency issue due to CommitProcessor
not gracefully shutdown"
This reverts commit 79f99af81842f415b97e1c3c18c953df5bd129b2.
---
.../java/org/apache/zookeeper/server/ExitCode.java | 5 +-
.../zookeeper/server/quorum/CommitProcessor.java | 15 ----
.../server/quorum/CommitProcessorTest.java | 80 +---------------------
3 files changed, 3 insertions(+), 97 deletions(-)
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ExitCode.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ExitCode.java
index 810be27..67af2c8 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ExitCode.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ExitCode.java
@@ -48,10 +48,7 @@ public enum ExitCode {
QUORUM_PACKET_ERROR(13),
/** Unable to bind to the quorum (election) port after multiple retry */
- UNABLE_TO_BIND_QUORUM_PORT(14),
-
- /** Failed to shutdown the request processor pipeline gracefully **/
- SHUTDOWN_UNGRACEFULLY(16);
+ UNABLE_TO_BIND_QUORUM_PORT(14);
private final int value;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
index 8044e65..4e52187 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
@@ -29,7 +29,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.common.Time;
-import org.apache.zookeeper.server.ExitCode;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.ServerMetrics;
@@ -622,20 +621,6 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
RequestP
workerPool.join(workerShutdownTimeoutMS);
}
- try {
- this.join(workerShutdownTimeoutMS);
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while waiting for CommitProcessor to finish");
- Thread.currentThread().interrupt();
- }
-
- if (this.isAlive()) {
- LOG.warn("CommitProcessor does not shutdown gracefully after "
- + "waiting for {} ms, exit to avoid potential "
- + "inconsistency issue", workerShutdownTimeoutMS);
- System.exit(ExitCode.SHUTDOWN_UNGRACEFULLY.getValue());
- }
-
if (nextProcessor != null) {
nextProcessor.shutdown();
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java
index e83ed73..d939dc0 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java
@@ -28,9 +28,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.jute.BinaryOutputArchive;
@@ -84,18 +82,13 @@ public class CommitProcessorTest extends ZKTestCase {
File tmpDir;
ArrayList<TestClientThread> testClients = new ArrayList<TestClientThread>();
CommitProcessor commitProcessor;
- DelayRequestProcessor delayProcessor;
public void setUp(int numCommitThreads, int numClientThreads, int writePercent) throws
Exception {
- setUp(numCommitThreads, numClientThreads, writePercent, false);
- }
-
- public void setUp(int numCommitThreads, int numClientThreads, int writePercent, boolean
withDelayProcessor) throws Exception {
stopped = false;
System.setProperty(CommitProcessor.ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS, Integer.toString(numCommitThreads));
tmpDir = ClientBase.createTmpDir();
ClientBase.setupTestEnv();
- zks = new TestZooKeeperServer(tmpDir, tmpDir, 4000, withDelayProcessor);
+ zks = new TestZooKeeperServer(tmpDir, tmpDir, 4000);
zks.startup();
for (int i = 0; i < numClientThreads; ++i) {
TestClientThread client = new TestClientThread(writePercent);
@@ -219,23 +212,6 @@ public class CommitProcessorTest extends ZKTestCase {
}
@Test
- public void testWaitingForWriteToFinishBeforeShutdown() throws Exception {
- setUp(1, 0, 0, true);
-
- // send a single write request
- TestClientThread client = new TestClientThread(0);
- client.sendWriteRequest();
-
- // wait for request being committed
- delayProcessor.waitRequestProcessing();
-
- zks.shutdown();
-
- // Make sure we've finished the in-flight request before shutdown returns
- assertFalse(commitProcessor.isAlive());
- }
-
- @Test
public void testNoCommitWorkersMixedWorkload() throws Exception {
int numClients = 10;
LOG.info("testNoCommitWorkersMixedWorkload 25w/75r workload test");
@@ -311,15 +287,8 @@ public class CommitProcessorTest extends ZKTestCase {
private class TestZooKeeperServer extends ZooKeeperServer {
- final boolean withDelayProcessor;
-
public TestZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException
{
- this(snapDir, logDir, tickTime, false);
- }
-
- public TestZooKeeperServer(File snapDir, File logDir, int tickTime, boolean withDelayProcessor)
throws IOException {
super(snapDir, logDir, tickTime);
- this.withDelayProcessor = withDelayProcessor;
}
public PrepRequestProcessor getFirstProcessor() {
@@ -334,12 +303,7 @@ public class CommitProcessorTest extends ZKTestCase {
// ValidateProcessor is set up in a similar fashion to ToBeApplied
// processor, so it can do pre/post validating of requests
ValidateProcessor validateProcessor = new ValidateProcessor(finalProcessor);
- if (withDelayProcessor) {
- delayProcessor = new DelayRequestProcessor(validateProcessor);
- commitProcessor = new CommitProcessor(delayProcessor, "1", true, null);
- } else {
- commitProcessor = new CommitProcessor(validateProcessor, "1", true, null);
- }
+ commitProcessor = new CommitProcessor(validateProcessor, "1", true, null);
validateProcessor.setCommitProcessor(commitProcessor);
commitProcessor.start();
MockProposalRequestProcessor proposalProcessor = new MockProposalRequestProcessor(commitProcessor);
@@ -350,46 +314,6 @@ public class CommitProcessorTest extends ZKTestCase {
}
- private class DelayRequestProcessor implements RequestProcessor {
- // delay 1s for each request
- static final int DEFAULT_DELAY = 1000;
- RequestProcessor nextProcessor;
- CountDownLatch waitingProcessRequestBeingCalled;
-
- public DelayRequestProcessor(RequestProcessor nextProcessor) {
- this.nextProcessor = nextProcessor;
- this.waitingProcessRequestBeingCalled = new CountDownLatch(1);
- }
-
- @Override
- public void processRequest(Request request) throws RequestProcessorException {
- try {
- this.waitingProcessRequestBeingCalled.countDown();
- LOG.info("Sleeping {} ms for request {}", DEFAULT_DELAY, request);
- Thread.sleep(DEFAULT_DELAY);
- } catch (InterruptedException e) { /* ignore */ }
- nextProcessor.processRequest(request);
- }
-
- public void waitRequestProcessing() {
- try {
- if (!waitingProcessRequestBeingCalled.await(3000, TimeUnit.MILLISECONDS))
{
- LOG.info("Did not see request processing in 3s");
- }
- } catch (InterruptedException e) {
- LOG.info("Interrupted when waiting for processRequest being called");
- }
- }
-
- @Override
- public void shutdown() {
- LOG.info("shutdown DelayRequestProcessor");
- if (nextProcessor != null) {
- nextProcessor.shutdown();
- }
- }
- }
-
private class MockProposalRequestProcessor extends Thread implements RequestProcessor
{
private final CommitProcessor commitProcessor;
|