zookeeper-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [zookeeper] jhuan31 commented on a change in pull request #800: ZOOKEEPER-3268: Add commit processor metrics
Date Fri, 19 Apr 2019 15:57:02 GMT
jhuan31 commented on a change in pull request #800: ZOOKEEPER-3268: Add commit processor metrics
URL: https://github.com/apache/zookeeper/pull/800#discussion_r277022479
 
 

 ##########
 File path: zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java
 ##########
 @@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.server.*;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.*;
+
+public class CommitProcessorMetricsTest extends ZKTestCase {
+    protected static final Logger LOG =
+            LoggerFactory.getLogger(CommitProcessorMetricsTest.class);
+    CommitProcessor commitProcessor;
+    DummyFinalProcessor finalProcessor;
+
+    CountDownLatch requestScheduled = null;
+    CountDownLatch requestSeen = null;
+    CountDownLatch commitSeen = null;
+    CountDownLatch poolEmpytied = null;
+
+    @Before
+    public void setup() {
+        LOG.info("setup");
+        ServerMetrics.resetAll();
+    }
+
+    public void setupProcessors(int commitWorkers, int finalProcTime ) {
+        finalProcessor = new DummyFinalProcessor(finalProcTime);
+        commitProcessor = new TestCommitProcessor(finalProcessor, commitWorkers);
+        commitProcessor.start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        LOG.info("tearDown starting");
+
+        commitProcessor.shutdown();
+        commitProcessor.join();
+    }
+
+    private class TestCommitProcessor extends CommitProcessor {
+        int numWorkerThreads;
+
+        public TestCommitProcessor(RequestProcessor finalProcessor, int numWorkerThreads)
{
+            super(finalProcessor, "1", true, null);
+            this.numWorkerThreads = numWorkerThreads;
+        }
+
+        @Override
+        public void start() {
+            super.workerPool = new TestWorkerService(numWorkerThreads);
+            super.start();
+            // the sleep is needed to make sure that the thread is in the wait status
+            // and it won't start to process a request before the required countdown latch
+            // is set
+            try {
+                Thread.sleep(50);
+            } catch (Exception e){
+
+            }
+            LOG.info("numWorkerThreads in Test is {}", numWorkerThreads);
+        }
+
+        @Override
+        protected void endOfIteration() {
+            if (requestSeen != null) {
+                requestSeen.countDown();
+            }
+        }
+
+        @Override
+        protected void waitForEmptyPool() throws InterruptedException {
+            if (commitSeen != null) {
+                commitSeen.countDown();
+            }
+            super.waitForEmptyPool();
+            if (poolEmpytied != null) {
+                poolEmpytied.countDown();
+            }
+        }
+    }
+
+    private class TestWorkerService extends WorkerService {
+        public TestWorkerService(int numWorkerThreads) {
+            super("CommitProcWork", numWorkerThreads, true);
+        }
+
+        @Override
+        public void schedule(WorkRequest workRequest, long id) {
+            super.schedule(workRequest, id);
+            if (requestScheduled != null) {
+                requestScheduled.countDown();
+            }
+        }
+    }
+
+    private class DummyFinalProcessor implements RequestProcessor {
+        int processTime;
+        public DummyFinalProcessor(int processTime) {
+            this.processTime = processTime;
+        }
+
+        @Override
+        public void processRequest(Request request) {
+            if (processTime > 0) {
+                try {
+                    if (commitSeen != null) {
+                        commitSeen.await(5, TimeUnit.SECONDS);
+                    }
+                    Thread.sleep(processTime);
+                } catch (Exception e) {
+
+                }
+            }
+        }
+
+        @Override
+        public void shutdown(){
+        }
+    }
+
+    private void checkMetrics(String metricName, long min, long max, double avg, long cnt,
long sum) {
+        Map<String, Object> values = ServerMetrics.getAllValues();
+
+        Assert.assertEquals("expected min is " + min, min, values.get("min_" + metricName));
+        Assert.assertEquals("expected max is: " + max, max, values.get("max_" + metricName));
+        Assert.assertEquals("expected avg is: " + avg, avg, (Double)values.get("avg_" + metricName),
0.001);
+        Assert.assertEquals("expected cnt is: " + cnt, cnt, values.get("cnt_" + metricName));
+        Assert.assertEquals("expected sum is: " + sum, sum, values.get("sum_" + metricName));
+    }
+
+    private void checkTimeMetric(long actual, long lBoundrary, long hBoundrary) {
+        Assert.assertThat(actual, greaterThanOrEqualTo(lBoundrary));
+        Assert.assertThat(actual, lessThanOrEqualTo(hBoundrary));
+    }
+
+    private Request createReadRequest(long sessionId, int xid) {
+        return new Request(null, sessionId, xid, ZooDefs.OpCode.getData,
+                ByteBuffer.wrap(new byte[10]), null);
+    }
+
+    private Request createWriteRequest(long sessionId, int xid) {
+        return new Request(null, sessionId, xid, ZooDefs.OpCode.setData,
+                ByteBuffer.wrap(new byte[10]), null);
+    }
+
+    private void processRequestWithWait(Request request) throws Exception {
+        requestSeen = new CountDownLatch(1);
+        commitProcessor.processRequest(request);
+        requestSeen.await(5, TimeUnit.SECONDS);
+    }
+
+    private void commitWithWait(Request request) throws Exception {
+        requestSeen = new CountDownLatch(1);
+        commitProcessor.commit(request);
+        requestSeen.await(5, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testRequestsInSessionQueue() throws Exception {
+        setupProcessors(0, 0);
+
+        Request req1 = createWriteRequest(1l, 1);
+        processRequestWithWait(req1);
+
+        checkMetrics("requests_in_session_queue", 1L, 1L, 1D, 1L, 1L);
+
+        //these two read requests will be stuck in the session queue because there is write
in front of them
+        processRequestWithWait(createReadRequest(1L, 2));
+        processRequestWithWait(createReadRequest(1L,3));
+
+        checkMetrics("requests_in_session_queue", 1L, 3L, 2D, 3L, 6);
+
+        commitWithWait(req1);
+
+        checkMetrics("requests_in_session_queue", 1L, 3L, 2.25D, 4L, 9);
+    }
+
+    @Test
+    public void testWriteFinalProcTime() throws Exception {
+        setupProcessors(0, 1000);
+
+        Request req1 = createWriteRequest(1l, 2);
+        processRequestWithWait(req1);
+
+        //no request sent to next processor yet
+        Map<String, Object> values = ServerMetrics.getAllValues();
+        Assert.assertEquals(0L, values.get("cnt_write_final_proc_time_ms"));
+
+        commitWithWait(req1);
+
+        values = ServerMetrics.getAllValues();
+        Assert.assertEquals(1L, values.get("cnt_write_final_proc_time_ms"));
+        checkTimeMetric((long)values.get("max_write_final_proc_time_ms"), 1000L, 2000L);
+    }
+
+    @Test
+    public void testReadFinalProcTime() throws Exception {
+        setupProcessors(0, 1000);
+
+        processRequestWithWait(createReadRequest(1L, 1));
+
+        Map<String, Object> values = ServerMetrics.getAllValues();
+        Assert.assertEquals(1L, values.get("cnt_read_final_proc_time_ms"));
+        checkTimeMetric((long)values.get("max_read_final_proc_time_ms"), 1000L, 2000L);
+    }
+
+    @Test
+    public void testCommitProcessTime() throws Exception {
+        setupProcessors(0, 0);
+        processRequestWithWait(createReadRequest(1L, 1));
+
+        Map<String, Object> values = ServerMetrics.getAllValues();
+        Assert.assertEquals(1L, values.get("cnt_commit_process_time"));
+        checkTimeMetric((long)values.get("max_commit_process_time"), 0L, 1000L);
+    }
+
+    @Test
+    public void testServerWriteCommittedTime() throws Exception {
+        setupProcessors(0, 0);
+        //a commit w/o pending request is a write from other servers
+        commitWithWait(createWriteRequest(1L, 1));
+
+        Map<String, Object> values = ServerMetrics.getAllValues();
+        Assert.assertEquals(1L, values.get("cnt_server_write_committed_time_ms"));
+        checkTimeMetric((long)values.get("max_server_write_committed_time_ms"), 0L, 1000L);
+    }
+
+    @Test
+    public void testLocalWriteCommittedTime() throws Exception {
+        setupProcessors(0, 0);
+        Request req1 = createWriteRequest(1l, 2);
+        processRequestWithWait(req1);
+        commitWithWait(req1);
+
+        Map<String, Object> values = ServerMetrics.getAllValues();
+
+        Assert.assertEquals(1L, values.get("cnt_local_write_committed_time_ms"));
+        checkTimeMetric((long)values.get("max_local_write_committed_time_ms"), 0l, 1000l);
+
+        Request req2 = createWriteRequest(1l, 2);
+        processRequestWithWait(req2);
+        //the second write will be stuck in the session queue for at least one second
+        //but the LOCAL_WRITE_COMMITTED_TIME is from when the commit is received
+        Thread.sleep(1000);
 
 Review comment:
   En... I don't think this particular sleep will lead to flaky test. The purpose of this
sleep is not to wait for some other thread to finish something. It just delays the recording
of metrics so I can check that the processing time is about 1 second instead of 0.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message