zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sy...@apache.org
Subject [zookeeper] branch master updated: ZOOKEEPER-3796: Skip Learner Request made to ObserverMaster from going
Date Tue, 05 May 2020 19:21:14 GMT
This is an automated email from the ASF dual-hosted git repository.

symat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 05cd214  ZOOKEEPER-3796: Skip Learner Request made to ObserverMaster from going
05cd214 is described below

commit 05cd214a0cc9c870de373b54cfeb47a2a75efd28
Author: Mayank Tuteja <mayank99@fb.com>
AuthorDate: Tue May 5 19:20:35 2020 +0000

    ZOOKEEPER-3796: Skip Learner Request made to ObserverMaster from going
    
    ... to next processor
    
    Author: mayank99 <mayank99@devvm2541.prn3.facebook.com>
    Author: mayank99fb <63824268+mayank99fb@users.noreply.github.com>
    
    Reviewers: Andor Molnar <andor@apache.org>, Enrico Olivelli <eolivelli@apache.org>,
Mate Szalay-Beko <symat@apache.org>
    
    Closes #1322 from mayank99fb/ZOOKEEPER-3796
---
 .../src/main/resources/markdown/zookeeperAdmin.md  |   9 +
 .../java/org/apache/zookeeper/server/Request.java  |   5 +
 .../org/apache/zookeeper/server/ServerMetrics.java |   6 +
 .../server/quorum/FollowerRequestProcessor.java    |  19 +-
 .../server/FollowerRequestProcessorTest.java       |  89 ++++++++++
 .../apache/zookeeper/test/ObserverMasterTest.java  | 196 +--------------------
 .../zookeeper/test/ObserverMasterTestBase.java     | 120 +++++++++++++
 7 files changed, 249 insertions(+), 195 deletions(-)

diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index 6c616d1..2a9d7d3 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -1653,6 +1653,15 @@ New features that are currently considered experimental.
     values and see changes from other clients. See
     ZOOKEEPER-784 for more details.
 
+* *zookeeper.follower.skipLearnerRequestToNextProcessor* :
+    (Java system property: **zookeeper.follower.skipLearnerRequestToNextProcessor**)
+    When our cluster has observers which are connected with ObserverMaster, then turning
on this flag might help
+    you reduce some memory pressure on the Observer Master. If your cluster doesn't have
any observers or
+    they are not connected with ObserverMaster or your Observer's don't make much writes,
then using this flag
+    won't help you.
+    Currently the change here is guarded behind the flag to help us get more confidence around
the memory gains.
+    In Long run, we might want to remove this flag and set its behavior as the default codepath.
+
 <a name="Unsafe+Options"></a>
 
 #### Unsafe Options
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index 43a68ac..4296471 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -27,6 +27,7 @@ import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.metrics.Summary;
 import org.apache.zookeeper.metrics.SummarySet;
+import org.apache.zookeeper.server.quorum.LearnerHandler;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.AuthUtil;
 import org.apache.zookeeper.txn.TxnDigest;
@@ -490,4 +491,8 @@ public class Request {
     public void setTxnDigest(TxnDigest txnDigest) {
         this.txnDigest = txnDigest;
     }
+
+    public boolean isFromLearner() {
+        return owner instanceof LearnerHandler;
+    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index 7ea7010..36a65df 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -237,7 +237,10 @@ public final class ServerMetrics {
 
         CNXN_CLOSED_WITHOUT_ZK_SERVER_RUNNING = metricsContext.getCounter("cnxn_closed_without_zk_server_running");
 
+        SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR_COUNT = metricsContext.getCounter("skip_learner_request_to_next_processor_count");
+
         SOCKET_CLOSING_TIME = metricsContext.getSummary("socket_closing_time", DetailLevel.BASIC);
+
     }
 
     /**
@@ -458,8 +461,11 @@ public final class ServerMetrics {
 
     public final Counter CNXN_CLOSED_WITHOUT_ZK_SERVER_RUNNING;
 
+    public final Counter SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR_COUNT;
+
     public final Summary SOCKET_CLOSING_TIME;
 
+
     private final MetricsProvider metricsProvider;
 
     public void resetAll() {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
index db51aee..90c4d49 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
@@ -24,6 +24,7 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.ZooKeeperCriticalThread;
 import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.txn.ErrorTxn;
@@ -38,6 +39,10 @@ public class FollowerRequestProcessor extends ZooKeeperCriticalThread implements
 
     private static final Logger LOG = LoggerFactory.getLogger(FollowerRequestProcessor.class);
 
+    public static final String SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR = "zookeeper.follower.skipLearnerRequestToNextProcessor";
+
+    private final boolean skipLearnerRequestToNextProcessor;
+
     FollowerZooKeeperServer zks;
 
     RequestProcessor nextProcessor;
@@ -50,6 +55,9 @@ public class FollowerRequestProcessor extends ZooKeeperCriticalThread implements
         super("FollowerRequestProcessor:" + zks.getServerId(), zks.getZooKeeperServerListener());
         this.zks = zks;
         this.nextProcessor = nextProcessor;
+        this.skipLearnerRequestToNextProcessor = Boolean.getBoolean(SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR);
+        LOG.info("Initialized FollowerRequestProcessor with {} as {}", SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR,
+                skipLearnerRequestToNextProcessor);
     }
 
     @Override
@@ -72,7 +80,8 @@ public class FollowerRequestProcessor extends ZooKeeperCriticalThread implements
                 // We want to queue the request to be processed before we submit
                 // the request to the leader so that we are ready to receive
                 // the response
-                nextProcessor.processRequest(request);
+                maybeSendRequestToNextProcessor(request);
+
                 if (request.isThrottled()) {
                     continue;
                 }
@@ -115,6 +124,14 @@ public class FollowerRequestProcessor extends ZooKeeperCriticalThread
implements
         LOG.info("FollowerRequestProcessor exited loop!");
     }
 
+    private void maybeSendRequestToNextProcessor(Request request) throws RequestProcessorException
{
+        if (skipLearnerRequestToNextProcessor && request.isFromLearner()) {
+            ServerMetrics.getMetrics().SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR_COUNT.add(1);
+        } else {
+            nextProcessor.processRequest(request);
+        }
+    }
+
     public void processRequest(Request request) {
         processRequest(request, true);
     }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/FollowerRequestProcessorTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/FollowerRequestProcessorTest.java
new file mode 100644
index 0000000..dd81c77
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/FollowerRequestProcessorTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.util.PortForwarder;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.ObserverMasterTestBase;
+import org.junit.After;
+import org.junit.Test;
+
+public class FollowerRequestProcessorTest extends ObserverMasterTestBase {
+
+    private PortForwarder forwarder;
+
+    @Test
+    public void testFollowerRequestProcessorSkipsLearnerRequestToNextProcessor() throws Exception
{
+        setupTestObserverServer("true");
+
+        zk.create("/testFollowerSkipNextAProcessor", "test".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+
+        assertEquals("test", new String(zk.getData("/testFollowerSkipNextAProcessor", null,
null)));
+        assertEquals(1L, ServerMetrics.getMetrics().SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR_COUNT.get());
+    }
+
+    @Test
+    public void testFollowerRequestProcessorSendsLearnerRequestToNextProcessor() throws Exception
{
+        setupTestObserverServer("false");
+
+        zk.create("/testFollowerSkipNextAProcessor", "test".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+
+        assertEquals("test", new String(zk.getData("/testFollowerSkipNextAProcessor", null,
null)));
+        assertEquals(0L, ServerMetrics.getMetrics().SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR_COUNT.get());
+    }
+
+    private void setupTestObserverServer(String skipLearnerRequestToNextProcessor) throws
Exception {
+        System.setProperty(FollowerRequestProcessor.SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR,
skipLearnerRequestToNextProcessor);
+
+        // Setup Ensemble with observer master port so that observer connects with Observer
master and not the leader
+        final int OM_PROXY_PORT = PortAssignment.unique();
+        forwarder = setUp(OM_PROXY_PORT, true);
+
+        q3.start();
+        assertTrue(
+            "waiting for server 3 being up",
+            ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, CONNECTION_TIMEOUT));
+
+        // Connect with observer zookeeper
+        zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT,
this);
+        waitForOne(zk, States.CONNECTED);
+
+        // Clear all service metrics collected so far
+        ServerMetrics.getMetrics().resetAll();
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        System.setProperty(FollowerRequestProcessor.SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR,
"false");
+
+        shutdown();
+        if (forwarder != null) {
+            forwarder.shutdown();
+        }
+    }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
index 620953a..cfac042 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
@@ -50,8 +50,6 @@ import org.apache.zookeeper.DummyWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
 import org.apache.zookeeper.PortAssignment;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
@@ -60,10 +58,7 @@ import org.apache.zookeeper.admin.ZooKeeperAdmin;
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.jmx.ZKMBeanInfo;
 import org.apache.zookeeper.server.admin.Commands;
-import org.apache.zookeeper.server.quorum.DelayRequestProcessor;
-import org.apache.zookeeper.server.quorum.FollowerZooKeeperServer;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
 import org.apache.zookeeper.server.util.PortForwarder;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -72,7 +67,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @RunWith(Parameterized.class)
-public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher {
+public class ObserverMasterTest extends ObserverMasterTestBase {
 
     protected static final Logger LOG = LoggerFactory.getLogger(ObserverMasterTest.class);
 
@@ -87,184 +82,8 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements
Watcher {
 
     private Boolean testObserverMaster;
 
-    private CountDownLatch latch;
-    ZooKeeper zk;
-    private WatchedEvent lastEvent = null;
-
-    private int CLIENT_PORT_QP1;
-    private int CLIENT_PORT_QP2;
-    private int CLIENT_PORT_OBS;
-    private int OM_PORT;
-    private MainThread q1;
-    private MainThread q2;
-    private MainThread q3;
-
     private PortForwarder setUp(final int omProxyPort) throws IOException {
-        ClientBase.setupTestEnv();
-
-        final int PORT_QP1 = PortAssignment.unique();
-        final int PORT_QP2 = PortAssignment.unique();
-        final int PORT_OBS = PortAssignment.unique();
-        final int PORT_QP_LE1 = PortAssignment.unique();
-        final int PORT_QP_LE2 = PortAssignment.unique();
-        final int PORT_OBS_LE = PortAssignment.unique();
-
-        CLIENT_PORT_QP1 = PortAssignment.unique();
-        CLIENT_PORT_QP2 = PortAssignment.unique();
-        CLIENT_PORT_OBS = PortAssignment.unique();
-
-        OM_PORT = PortAssignment.unique();
-
-        String quorumCfgSection = "server.1=127.0.0.1:" + (PORT_QP1) + ":" + (PORT_QP_LE1)
+ ";" + CLIENT_PORT_QP1
-                                  + "\nserver.2=127.0.0.1:" + (PORT_QP2) + ":" + (PORT_QP_LE2)
+ ";" + CLIENT_PORT_QP2
-                                  + "\nserver.3=127.0.0.1:" + (PORT_OBS) + ":" + (PORT_OBS_LE)
+ ":observer" + ";" + CLIENT_PORT_OBS;
-        String extraCfgs = testObserverMaster
-                ? String.format("observerMasterPort=%d%n", OM_PORT)
-                : "";
-        String extraCfgsObs = testObserverMaster
-            ? String.format("observerMasterPort=%d%n", omProxyPort <= 0 ? OM_PORT : omProxyPort)
-            : "";
-
-        PortForwarder forwarder = null;
-        if (testObserverMaster && omProxyPort >= 0) {
-            forwarder = new PortForwarder(omProxyPort, OM_PORT);
-        }
-
-        q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection, extraCfgs);
-        q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection, extraCfgs);
-        q3 = new MainThread(3, CLIENT_PORT_OBS, quorumCfgSection, extraCfgsObs);
-        q1.start();
-        q2.start();
-        assertTrue(
-            "waiting for server 1 being up",
-            ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, CONNECTION_TIMEOUT));
-        assertTrue(
-            "waiting for server 2 being up",
-            ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, CONNECTION_TIMEOUT));
-        return forwarder;
-    }
-
-    private void shutdown() throws InterruptedException {
-        LOG.info("Shutting down all servers");
-        zk.close();
-
-        q1.shutdown();
-        q2.shutdown();
-        q3.shutdown();
-
-        assertTrue(
-            "Waiting for server 1 to shut down",
-            ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
-        assertTrue(
-            "Waiting for server 2 to shut down",
-            ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
-        assertTrue(
-            "Waiting for server 3 to shut down",
-            ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT));
-    }
-
-    @Test
-    public void testLaggingObserverMaster() throws Exception {
-        final int OM_PROXY_PORT = PortAssignment.unique();
-        PortForwarder forwarder = setUp(OM_PROXY_PORT);
-
-        // find the leader and observer master
-        int leaderPort;
-        MainThread leader;
-        MainThread follower;
-        if (q1.getQuorumPeer().leader != null) {
-            leaderPort = CLIENT_PORT_QP1;
-            leader = q1;
-            follower = q2;
-        } else if (q2.getQuorumPeer().leader != null) {
-            leaderPort = CLIENT_PORT_QP2;
-            leader = q2;
-            follower = q1;
-        } else {
-            throw new RuntimeException("No leader");
-        }
-
-        // ensure the observer master has commits in the queue before observer sync
-        zk = new ZooKeeper("127.0.0.1:" + leaderPort, ClientBase.CONNECTION_TIMEOUT, this);
-        for (int i = 0; i < 10; i++) {
-            zk.create("/bulk" + i, ("initial data of some size").getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
-        }
-        zk.close();
-
-        q3.start();
-        assertTrue(
-            "waiting for server 3 being up",
-            ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, CONNECTION_TIMEOUT));
-
-        latch = new CountDownLatch(1);
-        zk = new ZooKeeper("127.0.0.1:" + leaderPort, ClientBase.CONNECTION_TIMEOUT, this);
-        latch.await();
-        assertEquals(zk.getState(), States.CONNECTED);
-
-        zk.create("/init", "first".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        final long lastLoggedZxid = leader.getQuorumPeer().getLastLoggedZxid();
-
-        // wait for change to propagate
-        waitFor("Timeout waiting for observer sync", new WaitForCondition() {
-            public boolean evaluate() {
-                return lastLoggedZxid == q3.getQuorumPeer().getLastLoggedZxid();
-            }
-        }, 30);
-
-        // simulate network fault
-        if (forwarder != null) {
-            forwarder.shutdown();
-        }
-
-        for (int i = 0; i < 10; i++) {
-            zk.create("/basic" + i, "second".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        }
-
-        DelayRequestProcessor delayRequestProcessor = null;
-        if (testObserverMaster) {
-            FollowerZooKeeperServer followerZooKeeperServer = (FollowerZooKeeperServer) follower.getQuorumPeer().getActiveServer();
-            delayRequestProcessor = DelayRequestProcessor.injectDelayRequestProcessor(followerZooKeeperServer);
-        }
-
-        zk.create("/target1", "third".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        zk.create("/target2", "third".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
-        LOG.info(
-            "observer zxid {}{} leader zxid {}",
-            Long.toHexString(q3.getQuorumPeer().getLastLoggedZxid()),
-            (testObserverMaster ? "" : " observer master zxid " + Long.toHexString(follower.getQuorumPeer().getLastLoggedZxid())),
-            Long.toHexString(leader.getQuorumPeer().getLastLoggedZxid()));
-
-        // restore network
-        forwarder = testObserverMaster ? new PortForwarder(OM_PROXY_PORT, OM_PORT) : null;
-
-        assertTrue(
-            "waiting for server 3 being up",
-            ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, CONNECTION_TIMEOUT));
-        assertNotNull("Leader switched", leader.getQuorumPeer().leader);
-
-        if (delayRequestProcessor != null) {
-            delayRequestProcessor.unblockQueue();
-        }
-
-        latch = new CountDownLatch(1);
-        ZooKeeper obsZk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT,
this);
-        latch.await();
-        zk.create("/finalop", "fourth".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
-        assertEquals("first", new String(obsZk.getData("/init", null, null)));
-        assertEquals("third", new String(obsZk.getData("/target1", null, null)));
-
-        obsZk.close();
-        shutdown();
-
-        try {
-            if (forwarder != null) {
-                forwarder.shutdown();
-            }
-        } catch (Exception e) {
-            // ignore
-        }
+        return setUp(omProxyPort, testObserverMaster);
     }
 
     /**
@@ -678,17 +497,6 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements
Watcher {
         s1.shutdown();
     }
 
-    /**
-     * Implementation of watcher interface.
-     */
-    public void process(WatchedEvent event) {
-        lastEvent = event;
-        if (latch != null) {
-            latch.countDown();
-        }
-        LOG.info("Latch got event :: {}", event);
-    }
-
     class AsyncWriter implements Runnable {
 
         private final ZooKeeper client;
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTestBase.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTestBase.java
new file mode 100644
index 0000000..505378a
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTestBase.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
+import org.apache.zookeeper.server.util.PortForwarder;
+
+public class ObserverMasterTestBase extends QuorumPeerTestBase implements Watcher {
+
+    protected CountDownLatch latch;
+    protected ZooKeeper zk;
+    protected int CLIENT_PORT_QP1;
+    protected int CLIENT_PORT_QP2;
+    protected int CLIENT_PORT_OBS;
+    protected int OM_PORT;
+    protected MainThread q1;
+    protected MainThread q2;
+    protected MainThread q3;
+    protected WatchedEvent lastEvent = null;
+
+    protected PortForwarder setUp(final int omProxyPort, final Boolean testObserverMaster)
throws IOException {
+        ClientBase.setupTestEnv();
+        final int PORT_QP1 = PortAssignment.unique();
+        final int PORT_QP2 = PortAssignment.unique();
+        final int PORT_OBS = PortAssignment.unique();
+        final int PORT_QP_LE1 = PortAssignment.unique();
+        final int PORT_QP_LE2 = PortAssignment.unique();
+        final int PORT_OBS_LE = PortAssignment.unique();
+
+        CLIENT_PORT_QP1 = PortAssignment.unique();
+        CLIENT_PORT_QP2 = PortAssignment.unique();
+        CLIENT_PORT_OBS = PortAssignment.unique();
+
+        OM_PORT = PortAssignment.unique();
+
+        String quorumCfgSection =
+                "server.1=127.0.0.1:" + (PORT_QP1)
+                        + ":" + (PORT_QP_LE1) + ";" +  CLIENT_PORT_QP1
+                        + "\nserver.2=127.0.0.1:" + (PORT_QP2)
+                        + ":" + (PORT_QP_LE2) + ";" + CLIENT_PORT_QP2
+                        + "\nserver.3=127.0.0.1:" + (PORT_OBS)
+                        + ":" + (PORT_OBS_LE) + ":observer" + ";" + CLIENT_PORT_OBS;
+
+        String extraCfgs = testObserverMaster ? String.format("observerMasterPort=%d%n",
OM_PORT) : "";
+        String extraCfgsObs = testObserverMaster ? String.format("observerMasterPort=%d%n",
omProxyPort <= 0 ? OM_PORT : omProxyPort) : "";
+
+        PortForwarder forwarder = null;
+        if (testObserverMaster && omProxyPort >= 0) {
+            forwarder = new PortForwarder(omProxyPort, OM_PORT);
+        }
+
+        q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection, extraCfgs);
+        q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection, extraCfgs);
+        q3 = new MainThread(3, CLIENT_PORT_OBS, quorumCfgSection, extraCfgsObs);
+        q1.start();
+        q2.start();
+        assertTrue(
+            "waiting for server 1 being up",
+            ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, CONNECTION_TIMEOUT));
+        assertTrue(
+            "waiting for server 2 being up",
+            ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, CONNECTION_TIMEOUT));
+        return forwarder;
+    }
+
+    protected void shutdown() throws InterruptedException {
+        LOG.info("Shutting down all servers");
+
+        zk.close();
+
+        q1.shutdown();
+        q2.shutdown();
+        q3.shutdown();
+
+        assertTrue(
+            "Waiting for server 1 to shut down",
+            ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
+        assertTrue(
+            "Waiting for server 2 to shut down",
+            ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
+        assertTrue(
+            "Waiting for server 3 to shut down",
+            ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT));
+    }
+
+    /**
+     * Implementation of watcher interface.
+     */
+    public void process(WatchedEvent event) {
+        lastEvent = event;
+        if (latch != null) {
+            latch.countDown();
+        }
+        LOG.info("Latch got event :: {}", event);
+    }
+}


Mime
View raw message