zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject [zookeeper] branch master updated: ZOOKEEPER-3310: Add metrics for prep processor
Date Wed, 17 Apr 2019 09:06:29 GMT
This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new e57e7a3  ZOOKEEPER-3310: Add metrics for prep processor
e57e7a3 is described below

commit e57e7a371da9ba6161fa06c29683679f4271e346
Author: Jie Huang <jiehuang@fb.com>
AuthorDate: Sun Apr 14 16:05:59 2019 +0200

    ZOOKEEPER-3310: Add metrics for prep processor
    
    Author: Jie Huang <jiehuang@fb.com>
    
    Reviewers: andor@apache.org
    
    Closes #855 from jhuan31/ZOOKEEPER-3310 and squashes the following commits:
    
    5875158b4 [Jie Huang] remove sleep in tests
    2033456bb [Jie Huang] Reconstructed unit test. Add left out metric
    245663d88 [Jie Huang] ZOOKEEPER-3310: Add metrics for prep processor
    
    (cherry picked from commit bbc39c42bba74add9a625adca3ee52fb1cd02335)
    Signed-off-by: Andor Molnar <andor@apache.org>
---
 .../zookeeper/server/FinalRequestProcessor.java    |   1 +
 .../zookeeper/server/PrepRequestProcessor.java     |  11 +-
 .../java/org/apache/zookeeper/server/Request.java  |   2 +
 .../org/apache/zookeeper/server/ServerMetrics.java |   9 +
 .../server/PrepRequestProcessorMetricsTest.java    | 181 +++++++++++++++++++++
 5 files changed, 203 insertions(+), 1 deletion(-)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
index 8a237ff..1242516 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
@@ -126,6 +126,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                 while (!zks.outstandingChanges.isEmpty()
                        && zks.outstandingChanges.peek().zxid <= zxid) {
                     ChangeRecord cr = zks.outstandingChanges.remove();
+                    ServerMetrics.OUTSTANDING_CHANGES_REMOVED.add(1);
                     if (cr.zxid < zxid) {
                         LOG.warn("Zxid outstanding " + cr.zxid
                                  + " is less than current " + zxid);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
index d0a1ac9..38babbb 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
@@ -131,6 +131,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
     public void run() {
         try {
             while (true) {
+                ServerMetrics.PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
                 Request request = submittedRequests.take();
                 long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                 if (request.type == OpCode.ping) {
@@ -142,7 +143,9 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
                 if (Request.requestOfDeath == request) {
                     break;
                 }
+                long prepStartTime = Time.currentElapsedTime();
                 pRequest(request);
+                ServerMetrics.PREP_PROCESS_TIME.add(Time.currentElapsedTime() - prepStartTime);
             }
         } catch (RequestProcessorException e) {
             if (e.getCause() instanceof XidRolloverException) {
@@ -183,10 +186,11 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
         }
     }
 
-    private void addChangeRecord(ChangeRecord c) {
+    protected void addChangeRecord(ChangeRecord c) {
         synchronized (zks.outstandingChanges) {
             zks.outstandingChanges.add(c);
             zks.outstandingChangesForPath.put(c.path, c);
+            ServerMetrics.OUTSTANDING_CHANGES_QUEUED.add(1);
         }
     }
 
@@ -588,6 +592,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
                 // queues up this operation without being the session owner.
                 // this request is the last of the session so it should be ok
                 //zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
+                long startTime =  Time.currentElapsedTime();
                 Set<String> es = zks.getZKDatabase()
                         .getEphemerals(request.sessionId);
                 synchronized (zks.outstandingChanges) {
@@ -605,6 +610,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
 
                     zks.sessionTracker.setSessionClosing(request.sessionId);
                 }
+                ServerMetrics.CLOSE_SESSION_PREP_TIME.add(Time.currentElapsedTime() - startTime);
                 break;
             case OpCode.check:
                 zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
@@ -902,6 +908,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
             }
         }
         request.zxid = zks.getZxid();
+        ServerMetrics.PREP_PROCESSOR_QUEUE_TIME.add(Time.currentElapsedTime() - request.prepQueueStartTime);
         nextProcessor.processRequest(request);
     }
 
@@ -1005,7 +1012,9 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
     }
 
     public void processRequest(Request request) {
+        request.prepQueueStartTime =  Time.currentElapsedTime();
         submittedRequests.add(request);
+        ServerMetrics.PREP_PROCESSOR_QUEUED.add(1);
     }
 
     public void shutdown() {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index 4895d8c..f6c50bc 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -78,6 +78,8 @@ public class Request {
 
     public final long createTime = Time.currentElapsedTime();
 
+    public long prepQueueStartTime= -1;
+
     private Object owner;
 
     private KeeperException e;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index ea6514c..c27716c 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -169,6 +169,15 @@ public final class ServerMetrics {
     public final SummarySet READ_PER_NAMESPACE;
     public final Counter BYTES_RECEIVED_COUNT;
 
+    PREP_PROCESSOR_QUEUE_TIME(new AvgMinMaxPercentileCounter("prep_processor_queue_time_ms")),
+    PREP_PROCESSOR_QUEUE_SIZE(new AvgMinMaxCounter("prep_processor_queue_size")),
+    PREP_PROCESSOR_QUEUED(new SimpleCounter("prep_processor_request_queued")),
+    OUTSTANDING_CHANGES_QUEUED(new SimpleCounter("outstanding_changes_queued")),
+    OUTSTANDING_CHANGES_REMOVED(new SimpleCounter("outstanding_changes_removed")),
+    PREP_PROCESS_TIME(new AvgMinMaxCounter("prep_process_time")),
+    CLOSE_SESSION_PREP_TIME(new AvgMinMaxPercentileCounter("close_session_prep_time")),
+
+
     /**
      * Fired watcher stats.
      */
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
new file mode 100644
index 0000000..75b1d6c
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import jline.internal.Log;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.data.StatPersisted;
+import org.apache.zookeeper.proto.DeleteRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
+import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.QuorumUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.number.OrderingComparison.greaterThan;
+import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.*;
+
+public class PrepRequestProcessorMetricsTest extends ZKTestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(PrepRequestProcessorMetricsTest.class);
+
+    ZooKeeperServer zks;
+    RequestProcessor nextProcessor;
+
+    @Before
+    public void setup() {
+        zks = spy(new ZooKeeperServer());
+        zks.sessionTracker = mock(SessionTracker.class);
+
+        ZKDatabase db = mock(ZKDatabase.class);
+        when(zks.getZKDatabase()).thenReturn(db);
+
+        DataNode node = new DataNode(new byte[1], null, mock(StatPersisted.class));
+        when(db.getNode(anyString())).thenReturn(node);
+
+        Set<String> ephemerals = new HashSet<>();
+        ephemerals.add("/crystalmountain");
+        ephemerals.add("/stevenspass");
+        when(db.getEphemerals(anyLong())).thenReturn(ephemerals);
+
+        nextProcessor = mock(RequestProcessor.class);
+        ServerMetrics.resetAll();
+    }
+
+    private Request createRequest(Record record, int opCode) throws IOException {
+        // encoding
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        record.serialize(boa, "request");
+        baos.close();
+        return new Request(null, 1l, 0, opCode, ByteBuffer.wrap(baos.toByteArray()), null);
+    }
+
+    private Request createRequest(String path, int opCode) throws IOException {
+        Record record;
+        switch (opCode) {
+            case ZooDefs.OpCode.setData:
+                record = new SetDataRequest(path, new byte[0], -1);
+                break;
+            case ZooDefs.OpCode.delete:
+                record = new DeleteRequest(path, -1);
+                break;
+            default:
+                record = new DeleteRequest(path, -1);
+                break;
+        }
+
+        return createRequest(record, opCode);
+    }
+
+    private Request createRequest(long sessionId, int opCode) {
+        return new Request(null, sessionId, 0, opCode, null, null);
+    }
+
+    @Test
+    public void testPrepRequestProcessorMetrics() throws Exception {
+        CountDownLatch threeRequests = new CountDownLatch(3);
+        doAnswer(invocationOnMock -> {
+            threeRequests.countDown();
+            return  null;}).when(nextProcessor).processRequest(any(Request.class));
+
+        PrepRequestProcessor prepRequestProcessor = new PrepRequestProcessor(zks, nextProcessor);
+        PrepRequestProcessor.skipACL = true;
+
+        //setData will generate one change
+        prepRequestProcessor.processRequest(createRequest("/foo", ZooDefs.OpCode.setData));
+        //delete will generate two changes, one for itself, one for its parent
+        prepRequestProcessor.processRequest(createRequest("/foo/bar", ZooDefs.OpCode.delete));
+        //mocking two ephemeral nodes exists for this session so two changes
+        prepRequestProcessor.processRequest(createRequest(2, ZooDefs.OpCode.closeSession));
+
+        Map<String, Object> values = ServerMetrics.getAllValues();
+        Assert.assertEquals(3L, values.get("prep_processor_request_queued"));
+
+        prepRequestProcessor.start();
+
+        threeRequests.await(500, TimeUnit.MILLISECONDS);
+
+        values = ServerMetrics.getAllValues();
+        Assert.assertEquals(3L, values.get("max_prep_processor_queue_size"));
+
+        Assert.assertThat((long)values.get("min_prep_processor_queue_time_ms"), greaterThan(0l));
+        Assert.assertEquals(3L, values.get("cnt_prep_processor_queue_time_ms"));
+
+        Assert.assertEquals(3L, values.get("cnt_prep_process_time"));
+        Assert.assertThat((long)values.get("max_prep_process_time"), greaterThan(0l));
+
+        Assert.assertEquals(1L, values.get("cnt_close_session_prep_time"));
+        Assert.assertThat((long)values.get("max_close_session_prep_time"), greaterThanOrEqualTo(0L));
+
+        Assert.assertEquals(5L, values.get("outstanding_changes_queued"));
+    }
+
+    private class SimpleWatcher implements Watcher {
+        CountDownLatch created;
+        public SimpleWatcher(CountDownLatch latch) {
+            this.created = latch;
+        }
+        @Override
+        public void process(WatchedEvent e) {
+            created.countDown();
+        }
+    }
+
+    @Test
+    public void testOutstandingChangesRemoved() throws Exception {
+        // this metric is currently recorded in FinalRequestProcessor but it is tightly related
to the Prep metrics
+        QuorumUtil util = new QuorumUtil(1);
+        util.startAll();
+
+        ServerMetrics.resetAll();
+
+        ZooKeeper zk = ClientBase.createZKClient(util.getConnString());
+        zk.create("/test", new byte[50], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        CountDownLatch created = new CountDownLatch(1);
+        zk.exists("/test", new SimpleWatcher(created));
+        created.await(200, TimeUnit.MILLISECONDS);
+
+        Map<String, Object> values = ServerMetrics.getAllValues();
+        Assert.assertThat((long)values.get("outstanding_changes_removed"), greaterThan(0L));
+
+        util.shutdownAll();
+    }
+}


Mime
View raw message