zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eolive...@apache.org
Subject [zookeeper] branch master updated: ZOOKEEPER-3430: Observability improvement: provide top N read / write path queries.
Date Thu, 01 Aug 2019 05:28:30 GMT
This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d63602  ZOOKEEPER-3430: Observability improvement: provide top N read / write path queries.
6d63602 is described below

commit 6d636025d1c6a10840a27caee9e8933f1dbcbaf0
Author: Michael Han <lhan@twitter.com>
AuthorDate: Thu Aug 1 07:28:24 2019 +0200

    ZOOKEEPER-3430: Observability improvement: provide top N read / write path queries.
    
    We would like to have a better understanding of the type of workloads hit ZK, and one aspect of such understanding is to be able to answer queries of top N read and top N write request path. Knowing the hot request paths will allow us better optimize for such workloads, for example, enabling path specific caching, or change the path structure (e.g. break a long path to hierarchical paths).
    
    This commit adds a RequestPathMetricsCollector which will collect path requests stats and answers such queries. There are also a set of new Java system properties that make this collector highly configurable, including disable it totally (the default option is on.).
    
    The stats can also be queried through commands, which I will add in a different pull request once this landed, since this one is already big.
    
    Author: Michael Han <lhan@twitter.com>
    
    Reviewers: Enrico Olivelli <eolivelli@apache.org>
    
    Closes #989 from hanm/twitter/065e164bfbbf467a28602da76e9443faacaec1c8
---
 .../zookeeper/server/FinalRequestProcessor.java    |  25 +-
 .../java/org/apache/zookeeper/server/Request.java  |   2 +-
 .../apache/zookeeper/server/ZooKeeperServer.java   |  11 +
 .../server/util/RequestPathMetricsCollector.java   | 386 +++++++++++++++++
 .../util/RequestPathMetricsCollectorTest.java      | 464 +++++++++++++++++++++
 5 files changed, 881 insertions(+), 7 deletions(-)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
index cd0c0eb..d210d70 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
@@ -75,15 +75,11 @@ import org.apache.zookeeper.proto.SyncResponse;
 import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
 import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
 import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer;
+import org.apache.zookeeper.server.util.RequestPathMetricsCollector;
+
 import org.apache.zookeeper.txn.ErrorTxn;
 import org.apache.zookeeper.txn.TxnHeader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Locale;
 import java.util.Set;
 
@@ -99,10 +95,13 @@ import java.util.Set;
 public class FinalRequestProcessor implements RequestProcessor {
     private static final Logger LOG = LoggerFactory.getLogger(FinalRequestProcessor.class);
 
+    private final RequestPathMetricsCollector requestPathMetricsCollector;
+
     ZooKeeperServer zks;
 
     public FinalRequestProcessor(ZooKeeperServer zks) {
         this.zks = zks;
+        this.requestPathMetricsCollector = zks.getRequestPathMetricsCollector();
     }
 
     public void processRequest(Request request) {
@@ -312,6 +311,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                 lastOp = "CREA";
                 rsp = new CreateResponse(rc.path);
                 err = Code.get(rc.err);
+                requestPathMetricsCollector.registerRequest(request.type, rc.path);
                 break;
             }
             case OpCode.create2:
@@ -320,18 +320,21 @@ public class FinalRequestProcessor implements RequestProcessor {
                 lastOp = "CREA";
                 rsp = new Create2Response(rc.path, rc.stat);
                 err = Code.get(rc.err);
+                requestPathMetricsCollector.registerRequest(request.type, rc.path);
                 break;
             }
             case OpCode.delete:
             case OpCode.deleteContainer: {
                 lastOp = "DELE";
                 err = Code.get(rc.err);
+                requestPathMetricsCollector.registerRequest(request.type, rc.path);
                 break;
             }
             case OpCode.setData: {
                 lastOp = "SETD";
                 rsp = new SetDataResponse(rc.stat);
                 err = Code.get(rc.err);
+                requestPathMetricsCollector.registerRequest(request.type, rc.path);
                 break;
             }
             case OpCode.reconfig: {
@@ -344,6 +347,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                 lastOp = "SETA";
                 rsp = new SetACLResponse(rc.stat);
                 err = Code.get(rc.err);
+                requestPathMetricsCollector.registerRequest(request.type, rc.path);
                 break;
             }
             case OpCode.closeSession: {
@@ -357,6 +361,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                 ByteBufferInputStream.byteBuffer2Record(request.request,
                         syncRequest);
                 rsp = new SyncResponse(syncRequest.getPath());
+                requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath());
                 break;
             }
             case OpCode.check: {
@@ -378,6 +383,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                 Stat stat = zks.getZKDatabase().statNode(path, existsRequest
                         .getWatch() ? cnxn : null);
                 rsp = new ExistsResponse(stat);
+                requestPathMetricsCollector.registerRequest(request.type, path);
                 break;
             }
             case OpCode.getData: {
@@ -387,6 +393,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                         getDataRequest);
                 path = getDataRequest.getPath();
                 rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
+                requestPathMetricsCollector.registerRequest(request.type, path);
                 break;
             }
             case OpCode.setWatches: {
@@ -419,6 +426,8 @@ public class FinalRequestProcessor implements RequestProcessor {
                 Stat stat = new Stat();
                 List<ACL> acl =
                         zks.getZKDatabase().getACL(path, stat);
+                requestPathMetricsCollector.registerRequest(request.type, getACLRequest.getPath());
+
                 try {
                     PrepRequestProcessor.checkACL(zks, request.cnxn, zks.getZKDatabase().aclForNode(n),
                             ZooDefs.Perms.ADMIN,
@@ -446,6 +455,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                         getChildrenRequest);
                 path = getChildrenRequest.getPath();
                 rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo);
+                requestPathMetricsCollector.registerRequest(request.type, path);
                 break;
             }
             case OpCode.getAllChildrenNumber: {
@@ -484,6 +494,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                         path, stat, getChildren2Request
                                 .getWatch() ? cnxn : null);
                 rsp = new GetChildren2Response(children, stat);
+                requestPathMetricsCollector.registerRequest(request.type, path);
                 break;
             }
             case OpCode.checkWatches: {
@@ -500,6 +511,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                             path, type);
                     throw new KeeperException.NoWatcherException(msg);
                 }
+                requestPathMetricsCollector.registerRequest(request.type, checkWatches.getPath());
                 break;
             }
             case OpCode.removeWatches: {
@@ -516,6 +528,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                             path, type);
                     throw new KeeperException.NoWatcherException(msg);
                 }
+                requestPathMetricsCollector.registerRequest(request.type, removeWatches.getPath());
                 break;
             }
             case OpCode.getEphemerals: {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index e4fdb4d..769fc87 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -278,7 +278,7 @@ public class Request {
         }
     }
 
-    static String op2String(int op) {
+    public static String op2String(int op) {
         switch (op) {
         case OpCode.notification:
             return "notification";
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 60ab655..b74507b 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -73,6 +73,7 @@ import org.apache.zookeeper.server.auth.ProviderRegistry;
 import org.apache.zookeeper.server.auth.ServerAuthenticationProvider;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
+import org.apache.zookeeper.server.util.RequestPathMetricsCollector;
 import org.apache.zookeeper.server.util.JvmPauseMonitor;
 import org.apache.zookeeper.server.util.OSMXBean;
 import org.apache.zookeeper.txn.CreateSessionTxn;
@@ -124,6 +125,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     private boolean isResponseCachingEnabled = true;
     /* contains the configuration file content read at startup */
     protected String initialConfig;
+    private final RequestPathMetricsCollector requestPathMetricsCollector;
 
     protected enum State {
         INITIAL, RUNNING, SHUTDOWN, ERROR
@@ -209,6 +211,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     public ZooKeeperServer() {
         listener = new ZooKeeperServerListenerImpl(this);
         serverStats = new ServerStats(this);
+        this.requestPathMetricsCollector = new RequestPathMetricsCollector();
     }
 
     /**
@@ -236,6 +239,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
 
         this.initialConfig = initialConfig;
 
+        this.requestPathMetricsCollector = new RequestPathMetricsCollector();
+
         LOG.info("Created server with tickTime " + tickTime
                 + " minSessionTimeout " + getMinSessionTimeout()
                 + " maxSessionTimeout " + getMaxSessionTimeout()
@@ -279,6 +284,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         return serverStats;
     }
 
+    public RequestPathMetricsCollector getRequestPathMetricsCollector() {
+        return requestPathMetricsCollector;
+    }
+
     public BlueThrottle connThrottle() {
         return connThrottle;
     }
@@ -576,6 +585,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         registerMetrics();
 
         setState(State.RUNNING);
+        requestPathMetricsCollector.start();
         notifyAll();
     }
 
@@ -727,6 +737,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             }
         }
 
+        requestPathMetricsCollector.shutdown();
         unregisterJMX();
     }
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RequestPathMetricsCollector.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RequestPathMetricsCollector.java
new file mode 100644
index 0000000..c5baac0
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RequestPathMetricsCollector.java
@@ -0,0 +1,386 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringTokenizer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.server.Request;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.zookeeper.ZooDefs.OpCode.checkWatches;
+import static org.apache.zookeeper.ZooDefs.OpCode.create;
+import static org.apache.zookeeper.ZooDefs.OpCode.create2;
+import static org.apache.zookeeper.ZooDefs.OpCode.createContainer;
+import static org.apache.zookeeper.ZooDefs.OpCode.delete;
+import static org.apache.zookeeper.ZooDefs.OpCode.deleteContainer;
+import static org.apache.zookeeper.ZooDefs.OpCode.exists;
+import static org.apache.zookeeper.ZooDefs.OpCode.getACL;
+import static org.apache.zookeeper.ZooDefs.OpCode.getChildren;
+import static org.apache.zookeeper.ZooDefs.OpCode.getChildren2;
+import static org.apache.zookeeper.ZooDefs.OpCode.getData;
+import static org.apache.zookeeper.ZooDefs.OpCode.removeWatches;
+import static org.apache.zookeeper.ZooDefs.OpCode.setACL;
+import static org.apache.zookeeper.ZooDefs.OpCode.setData;
+import static org.apache.zookeeper.ZooDefs.OpCode.sync;
+
+/**
+ * This class holds the requests path ( up till a certain depth) stats per request type
+ */
+public class RequestPathMetricsCollector {
+    private static final Logger LOG = LoggerFactory.getLogger(RequestPathMetricsCollector.class);
+    // How many seconds does each slot represent, default is 15 seconds.
+    private final int REQUEST_STATS_SLOT_DURATION;
+    // How many slots we keep, default is 60 so it's 15 minutes total history.
+    private final int REQUEST_STATS_SLOT_CAPACITY;
+    // How far down the path we keep, default is 6.
+    private final int REQUEST_PREPROCESS_PATH_DEPTH;
+    // Sample rate, default is 0.1 (10%).
+    private final float REQUEST_PREPROCESS_SAMPLE_RATE;
+    private final long COLLECTOR_INITIAL_DELAY;
+    private final long COLLECTOR_DELAY;
+    private final int REQUEST_PREPROCESS_TOPPATH_MAX;
+    private final boolean enabled;
+
+    public static final String PATH_STATS_SLOT_CAPACITY = "zookeeper.pathStats.slotCapacity";
+    public static final String PATH_STATS_SLOT_DURATION = "zookeeper.pathStats.slotDuration";
+    public static final String PATH_STATS_MAX_DEPTH = "zookeeper.pathStats.maxDepth";
+    public static final String PATH_STATS_SAMPLE_RATE = "zookeeper.pathStats.sampleRate";
+    public static final String PATH_STATS_COLLECTOR_INITIAL_DELAY = "zookeeper.pathStats.initialDelay";
+    public static final String PATH_STATS_COLLECTOR_DELAY = "zookeeper.pathStats.delay";
+    public static final String PATH_STATS_TOP_PATH_MAX = "zookeeper.pathStats.topPathMax";
+    public static final String PATH_STATS_ENABLED = "zookeeper.pathStats.enabled";
+    private static final String PATH_SEPERATOR = "/";
+
+    private final Map<String, PathStatsQueue> immutableRequestsMap;
+    private final Random sampler;
+    private final ScheduledThreadPoolExecutor scheduledExecutor;
+    private final boolean accurateMode;
+
+    public RequestPathMetricsCollector() {
+        this(false);
+    }
+    
+    public RequestPathMetricsCollector(boolean accurateMode) {
+        final Map<String, PathStatsQueue> requestsMap = new HashMap<>();
+        this.sampler = new Random(System.currentTimeMillis());
+        this.accurateMode = accurateMode;
+
+        REQUEST_PREPROCESS_TOPPATH_MAX = Integer.getInteger(PATH_STATS_TOP_PATH_MAX, 20);
+        REQUEST_STATS_SLOT_DURATION = Integer.getInteger(PATH_STATS_SLOT_DURATION, 15);
+        REQUEST_STATS_SLOT_CAPACITY = Integer.getInteger(PATH_STATS_SLOT_CAPACITY, 60);
+        REQUEST_PREPROCESS_PATH_DEPTH = Integer.getInteger(PATH_STATS_MAX_DEPTH, 6);
+        REQUEST_PREPROCESS_SAMPLE_RATE = Float.parseFloat(
+            System.getProperty(PATH_STATS_SAMPLE_RATE, "0.1"));
+        COLLECTOR_INITIAL_DELAY = Long.getLong(PATH_STATS_COLLECTOR_INITIAL_DELAY, 5);
+        COLLECTOR_DELAY = Long.getLong(PATH_STATS_COLLECTOR_DELAY, 5);
+        enabled = Boolean.getBoolean(PATH_STATS_ENABLED);
+
+        LOG.info("{} = {}", PATH_STATS_SLOT_CAPACITY, REQUEST_STATS_SLOT_CAPACITY);
+        LOG.info("{} = {}", PATH_STATS_SLOT_DURATION, REQUEST_STATS_SLOT_DURATION);
+        LOG.info("{} = {}", PATH_STATS_MAX_DEPTH, REQUEST_PREPROCESS_PATH_DEPTH);
+        LOG.info("{} = {}", PATH_STATS_COLLECTOR_INITIAL_DELAY, COLLECTOR_INITIAL_DELAY);
+        LOG.info("{} = {}", PATH_STATS_COLLECTOR_DELAY, COLLECTOR_DELAY);
+        LOG.info("{} = {}", PATH_STATS_ENABLED, enabled);
+
+        this.scheduledExecutor = (ScheduledThreadPoolExecutor) Executors
+            .newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
+        scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+        scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        requestsMap.put(Request.op2String(create), new PathStatsQueue(create));
+        requestsMap.put(Request.op2String(create2), new PathStatsQueue(create2));
+        requestsMap.put(Request.op2String(createContainer), new PathStatsQueue(createContainer));
+        requestsMap.put(Request.op2String(deleteContainer), new PathStatsQueue(deleteContainer));
+        requestsMap.put(Request.op2String(delete), new PathStatsQueue(delete));
+        requestsMap.put(Request.op2String(exists), new PathStatsQueue(exists));
+        requestsMap.put(Request.op2String(setData), new PathStatsQueue(setData));
+        requestsMap.put(Request.op2String(getData), new PathStatsQueue(getData));
+        requestsMap.put(Request.op2String(getACL), new PathStatsQueue(getACL));
+        requestsMap.put(Request.op2String(setACL), new PathStatsQueue(setACL));
+        requestsMap.put(Request.op2String(getChildren), new PathStatsQueue(getChildren));
+        requestsMap.put(Request.op2String(getChildren2), new PathStatsQueue(getChildren2));
+        requestsMap.put(Request.op2String(checkWatches), new PathStatsQueue(checkWatches));
+        requestsMap.put(Request.op2String(removeWatches), new PathStatsQueue(removeWatches));
+        requestsMap.put(Request.op2String(sync), new PathStatsQueue(sync));
+        this.immutableRequestsMap = java.util.Collections.unmodifiableMap(requestsMap);
+    }
+
+    static boolean isWriteOp(int requestType) {
+        switch (requestType) {
+            case ZooDefs.OpCode.sync:
+            case ZooDefs.OpCode.create:
+            case ZooDefs.OpCode.create2:
+            case ZooDefs.OpCode.createContainer:
+            case ZooDefs.OpCode.delete:
+            case ZooDefs.OpCode.deleteContainer:
+            case ZooDefs.OpCode.setData:
+            case ZooDefs.OpCode.reconfig:
+            case ZooDefs.OpCode.setACL:
+            case ZooDefs.OpCode.multi:
+            case ZooDefs.OpCode.check:
+                return true;
+        }
+        return false;
+    }
+
+    static String trimPathDepth(String path, int maxDepth) {
+        int count = 0;
+        StringBuilder sb = new StringBuilder();
+        StringTokenizer pathTokenizer = new StringTokenizer(path, PATH_SEPERATOR);
+        while (pathTokenizer.hasMoreElements() && count++ < maxDepth) {
+            sb.append(PATH_SEPERATOR);
+            sb.append(pathTokenizer.nextToken());
+        }
+        path = sb.toString();
+        return path;
+    }
+
+    public void shutdown(){
+        if (!enabled) return;
+
+        LOG.info("shutdown scheduledExecutor");
+        scheduledExecutor.shutdownNow();
+    }
+    
+    public void start() {
+        if (!enabled) return;
+
+        LOG.info("Start the RequestPath collector");
+        immutableRequestsMap.forEach((opType, pathStatsQueue) -> pathStatsQueue.start());
+        
+        // Schedule to log the top used read/write paths every 5 mins
+        scheduledExecutor.scheduleWithFixedDelay(() -> {
+            LOG.info("%nHere are the top Read paths:");
+            logTopPaths(aggregatePaths(4, queue -> !queue.isWriteOperation()),
+                entry -> LOG.info(entry.getKey() + " : " + entry.getValue()));
+            LOG.info("%nHere are the top Write paths:");
+            logTopPaths(aggregatePaths(4, queue -> queue.isWriteOperation()),
+                entry -> LOG.info(entry.getKey() + " : " + entry.getValue()));
+        }, COLLECTOR_INITIAL_DELAY, COLLECTOR_DELAY, TimeUnit.MINUTES);
+    }
+
+    /**
+     * The public interface of the buffer. FinalRequestHandler will call into this for
+     * each request that has a path and this needs to be fast. we sample the path so that
+     * we don't have to store too many paths in memory
+     */
+    public void registerRequest(int type, String path) {
+        if (!enabled) return;
+        if (sampler.nextFloat() <= REQUEST_PREPROCESS_SAMPLE_RATE) {
+            PathStatsQueue pathStatsQueue = immutableRequestsMap.get(Request.op2String(type));
+            if (pathStatsQueue != null) {
+                pathStatsQueue.registerRequest(path);
+            } else {
+                LOG.error("We should not handle {}", type);
+            }
+        }
+    }
+
+    public void dumpTopRequestPath(PrintWriter pwriter, String requestTypeName, int queryMaxDepth) {
+        if (queryMaxDepth < 1) {
+            return;
+        }
+        PathStatsQueue pathStatsQueue = immutableRequestsMap.get(requestTypeName);
+        if (pathStatsQueue == null) {
+            pwriter.println("Can not find path stats for type: " + requestTypeName);
+            return;
+        } else {
+            pwriter.println("The top requests of type: " + requestTypeName);
+        }
+        Map<String, Integer> combinedMap;
+        final int maxDepth = Math.min(queryMaxDepth, REQUEST_PREPROCESS_PATH_DEPTH);
+        combinedMap = pathStatsQueue.collectStats(maxDepth);
+        logTopPaths(combinedMap, entry -> pwriter.println(entry.getKey() + " : " + entry.getValue()));
+    }
+
+    public void dumpTopReadPaths(PrintWriter pwriter, int queryMaxDepth) {
+        pwriter.println("The top read requests are");
+        dumpTopAggregatedPaths(pwriter, queryMaxDepth, queue -> !queue.isWriteOperation);
+    }
+
+    public void dumpTopWritePaths(PrintWriter pwriter, int queryMaxDepth) {
+        pwriter.println("The top write requests are");
+        dumpTopAggregatedPaths(pwriter, queryMaxDepth, queue -> queue.isWriteOperation);
+    }
+
+    public void dumpTopPaths(PrintWriter pwriter, int queryMaxDepth) {
+        pwriter.println("The top requests are");
+        dumpTopAggregatedPaths(pwriter, queryMaxDepth, queue -> true);
+    }
+
+    /**
+     * Combine all the path Stats Queue that matches the predicate together
+     * and then write to the pwriter
+     */
+    private void dumpTopAggregatedPaths(
+        PrintWriter pwriter, int queryMaxDepth, final Predicate<PathStatsQueue> predicate) {
+        if (!enabled) return;
+        final Map<String, Integer> combinedMap = aggregatePaths(queryMaxDepth, predicate);
+        logTopPaths(combinedMap, entry -> pwriter.println(entry.getKey() + " : " + entry.getValue()));
+    }
+
+    Map<String, Integer> aggregatePaths(int queryMaxDepth, Predicate<PathStatsQueue> predicate) {
+        final Map<String, Integer> combinedMap = new HashMap<>(REQUEST_PREPROCESS_TOPPATH_MAX);
+        final int maxDepth = Math.min(queryMaxDepth, REQUEST_PREPROCESS_PATH_DEPTH);
+        immutableRequestsMap.values().stream()
+            .filter(predicate)
+            .forEach(pathStatsQueue ->
+                pathStatsQueue.collectStats(maxDepth)
+                    .forEach((path, count) ->
+                        combinedMap.put(path, combinedMap.getOrDefault(path, 0) + count)));
+        return combinedMap;
+    }
+
+    void logTopPaths(Map<String, Integer> combinedMap, final Consumer<Map.Entry<String, Integer>> output) {
+        combinedMap.entrySet().stream()
+            .sorted(Comparator.comparing(Map.Entry<String, Integer>::getValue).reversed())//sort by path count
+            .limit(REQUEST_PREPROCESS_TOPPATH_MAX)
+            .forEach(output);
+    }
+
+    class PathStatsQueue {
+
+        private final String requestTypeName;
+        private final AtomicReference<ConcurrentLinkedQueue<String>> currentSlot;
+        private final LinkedBlockingQueue<Map<String, Integer>> requestPathStats;
+        private final boolean isWriteOperation;
+        
+        public PathStatsQueue(int requestType) {
+            this.requestTypeName = Request.op2String(requestType);
+            this.isWriteOperation = isWriteOp(requestType);
+            requestPathStats = new LinkedBlockingQueue<>(REQUEST_STATS_SLOT_CAPACITY);
+            currentSlot = new AtomicReference<>(new ConcurrentLinkedQueue<>());
+        }
+
+        /*
+         * The only write entry into this class, need to be fast.
+         * Just queue up the path to the current slot queue locking free.
+         */
+        public void registerRequest(String path) {
+            if (!enabled) return;
+            currentSlot.get().offer(path);
+        }
+
+        ConcurrentLinkedQueue<String> getCurrentSlot() {
+            return currentSlot.get();
+        }
+
+        /**
+         * Helper function to MR the paths in the queue to map with count
+         * 1. cut each path up to max depth
+         * 2. aggregate the paths based on its count
+         *
+         * @param tobeProcessedSlot queue of paths called
+         * @return a map containing aggregated path in the queue
+         */
+        Map<String, Integer> mapReducePaths(int maxDepth, Collection<String> tobeProcessedSlot) {
+            Map<String, Integer> newSlot = new ConcurrentHashMap<>();
+            tobeProcessedSlot.stream()
+                .filter(path -> path != null)
+                .forEach((path) -> {
+                    path = trimPathDepth(path, maxDepth);
+                    newSlot.put(path, newSlot.getOrDefault(path, 0) + 1);
+                });
+            return newSlot;
+        }
+
+        /**
+         * The only read point of this class
+         *
+         * @return the aggregated path to count map
+         */
+        public Map<String, Integer> collectStats(int maxDepth) {
+            Map<String, Integer> combinedMap;
+            // Take a snapshot of the current slot and convert it to map.
+            // Set the initial size as 0 since we don't want it to padding nulls in the end.
+            Map<String, Integer> snapShot = mapReducePaths(maxDepth,
+                Arrays.asList(currentSlot.get().toArray(new String[0])));
+            // Starting from the snapshot and go through the queue to reduce them into one map
+            // the iterator can run concurrently with write but we want to use a real lock in the test
+            synchronized (accurateMode ? requestPathStats : new Object()) {
+                combinedMap =
+                    requestPathStats.stream().reduce(snapShot, (firstMap, secondMap) -> {
+                        secondMap.forEach((key, value) -> {
+                                String trimmedPath = trimPathDepth(key, maxDepth);
+                                firstMap.put(trimmedPath, firstMap.getOrDefault(trimmedPath, 0) + value);
+                            }
+                        );
+                        return firstMap;
+                    });
+            }
+            return combinedMap;
+        }
+
+        /**
+         * Start to schedule the pre-processing of the current slot
+         */
+        public void start() {
+            if (!enabled) return;
+            // Staggered start and then run every 15 seconds no matter what
+            int delay = sampler.nextInt(REQUEST_STATS_SLOT_DURATION);
+            // We need to use fixed Delay as the fixed rate will start the next one right
+            // after the previous one finishes if it runs overtime instead of overlapping it.
+            scheduledExecutor.scheduleWithFixedDelay(() -> {
+                // Generate new slot so new requests will go here.
+                ConcurrentLinkedQueue<String> tobeProcessedSlot =
+                    currentSlot.getAndSet(new ConcurrentLinkedQueue<>());
+                try {
+                    // pre process the last slot and queue it up, only one thread scheduled modified
+                    // this but we can mess up the collect part so we put a lock in the test.
+                    Map<String, Integer> latestSlot =
+                        mapReducePaths(REQUEST_PREPROCESS_PATH_DEPTH, tobeProcessedSlot);
+                    synchronized (accurateMode ? requestPathStats : new Object()) {
+                        if (requestPathStats.remainingCapacity() <= 0) {
+                            requestPathStats.poll();
+                        }
+                        if (!requestPathStats.offer(latestSlot)) {
+                            LOG.error("Failed to insert the new request path stats for {}", requestTypeName);
+                        }
+                    }
+                } catch (Exception e) {
+                    LOG.error("Failed to insert the new request path stats for {} with exception {}",
+                        requestTypeName, e);
+                }
+            }, delay, REQUEST_STATS_SLOT_DURATION, TimeUnit.SECONDS);
+        }
+
+        boolean isWriteOperation() {
+            return isWriteOperation;
+        }
+    }
+}
+
+
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/RequestPathMetricsCollectorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/RequestPathMetricsCollectorTest.java
new file mode 100644
index 0000000..1ed8a4e
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/RequestPathMetricsCollectorTest.java
@@ -0,0 +1,464 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.Assert;
+
+import static org.apache.zookeeper.ZooDefs.OpCode.create;
+import static org.apache.zookeeper.ZooDefs.OpCode.create2;
+import static org.apache.zookeeper.ZooDefs.OpCode.delete;
+import static org.apache.zookeeper.ZooDefs.OpCode.exists;
+import static org.apache.zookeeper.ZooDefs.OpCode.getChildren;
+import static org.apache.zookeeper.ZooDefs.OpCode.getChildren2;
+import static org.apache.zookeeper.ZooDefs.OpCode.getData;
+import static org.apache.zookeeper.ZooDefs.OpCode.setData;
+
+public class RequestPathMetricsCollectorTest {
+
+    @Before
+    public void setUp() {
+        System.setProperty("zookeeper.pathStats.enabled", "true");
+        System.setProperty("zookeeper.pathStats.slotCapacity", "60");
+        System.setProperty("zookeeper.pathStats.slotDuration", "1");
+        System.setProperty("zookeeper.pathStats.maxDepth", "6");
+        System.setProperty("zookeeper.pathStats.sampleRate", "1.0");
+    }
+
+    @After
+    public void tearDown() {
+        System.clearProperty("zookeeper.pathStats.enabled");
+        System.clearProperty("zookeeper.pathStats.slotCapacity");
+        System.clearProperty("zookeeper.pathStats.slotDuration");
+        System.clearProperty("zookeeper.pathStats.maxDepth");
+        System.clearProperty("zookeeper.pathStats.sampleRate");
+    }
+
+    @Test
+    public void testTrimPath() {
+        //normal cases
+        String trimedPath = RequestPathMetricsCollector.trimPathDepth("/p1/p2/p3", 1);
+        Assert.assertTrue(trimedPath.equalsIgnoreCase("/p1"));
+        trimedPath = RequestPathMetricsCollector.trimPathDepth("/p1/p2/p3", 2);
+        Assert.assertTrue(trimedPath.equalsIgnoreCase("/p1/p2"));
+        trimedPath = RequestPathMetricsCollector.trimPathDepth("/p1/p2/p3", 3);
+        Assert.assertTrue(trimedPath.equalsIgnoreCase("/p1/p2/p3"));
+        trimedPath = RequestPathMetricsCollector.trimPathDepth("/p1/p2/p3", 4);
+        Assert.assertTrue(trimedPath.equalsIgnoreCase("/p1/p2/p3"));
+        //some extra symbols
+        trimedPath = RequestPathMetricsCollector.trimPathDepth("//p1 next/p2.index/p3:next",
+            3);
+        Assert.assertTrue(trimedPath.equalsIgnoreCase("/p1 next/p2.index/p3:next"));
+        trimedPath = RequestPathMetricsCollector.trimPathDepth("//p1 next/p2.index/p3:next",
+            2);
+        Assert.assertTrue(trimedPath.equalsIgnoreCase("/p1 next/p2.index"));
+        trimedPath = RequestPathMetricsCollector.trimPathDepth("//p1 next/p2.index/p3:next",
+            6);
+        Assert.assertTrue(trimedPath.equalsIgnoreCase("/p1 next/p2.index/p3:next"));
+    }
+
+    @Test
+    public void testQueueMapReduce() throws InterruptedException {
+        RequestPathMetricsCollector requestPathMetricsCollector = new RequestPathMetricsCollector();
+        RequestPathMetricsCollector.PathStatsQueue pathStatsQueue =
+            requestPathMetricsCollector.new PathStatsQueue(create2);
+        Thread path7 = new Thread(() -> {
+            for (int i = 0; i < 1000000; i++) {
+                pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5/path6/path7" +
+                    "_" + i);
+            }
+        });
+        path7.start();
+        Thread path6 = new Thread(() -> {
+            pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5/path6");
+            for(int i = 1; i < 100000; i++) {
+                pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5/path6"+"_"+i);
+            }
+        });
+        path6.start();
+        for(int i = 0; i < 1; i++) {
+            pathStatsQueue.registerRequest("/path1");
+        }
+        for(int i = 0; i < 10; i++) {
+            pathStatsQueue.registerRequest("/path1/path2"+"_"+i);
+        }
+        for(int i = 0; i < 100; i++) {
+            pathStatsQueue.registerRequest("/path1/path2/path3"+"_"+i);
+        }
+        for(int i = 0; i < 1000; i++) {
+            pathStatsQueue.registerRequest("/path1/path2/path3/path4"+"_"+i);
+        }
+        for(int i = 0; i < 10000; i++) {
+            pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5"+"_"+i);
+        }
+        path6.join();
+        path7.join();
+        Map<String, Integer> newSlot = pathStatsQueue.mapReducePaths(1,
+            pathStatsQueue.getCurrentSlot());
+        Assert.assertTrue(newSlot.size() == 1);
+        Assert.assertTrue(newSlot.get("/path1").compareTo(1111111) == 0);
+        //cut up to 2
+        newSlot = pathStatsQueue.mapReducePaths(2, pathStatsQueue.getCurrentSlot());
+        Assert.assertTrue(newSlot.size() == 12);
+        Assert.assertTrue(newSlot.get("/path1").compareTo(1) == 0);
+        Assert.assertTrue(newSlot.get("/path1/path2").compareTo(1111100) == 0);
+        //cut up to 3
+        newSlot = pathStatsQueue.mapReducePaths(3, pathStatsQueue.getCurrentSlot());
+        Assert.assertTrue(newSlot.size() == 112);
+        Assert.assertTrue(newSlot.get("/path1").compareTo(1) == 0);
+        Assert.assertTrue(newSlot.get("/path1/path2/path3").compareTo(1111000) == 0);
+        //cut up to 4
+        newSlot = pathStatsQueue.mapReducePaths(4, pathStatsQueue.getCurrentSlot());
+        Assert.assertTrue(newSlot.size() == 1112);
+        Assert.assertTrue(newSlot.get("/path1/path2/path3/path4").compareTo(1110000) == 0);
+        //cut up to 5
+        newSlot = pathStatsQueue.mapReducePaths(5, pathStatsQueue.getCurrentSlot());
+        Assert.assertTrue(newSlot.size() == 11112);
+        Assert.assertTrue(newSlot.get("/path1/path2/path3/path4/path5").compareTo(1100000) == 0);
+        //cut up to 6
+        newSlot = pathStatsQueue.mapReducePaths(6, pathStatsQueue.getCurrentSlot());
+        Assert.assertTrue(newSlot.size() == 111111);
+        Assert.assertTrue(newSlot.get("/path1/path2/path3/path4/path5/path6").compareTo(1000001) == 0);
+        //cut up to 7
+        newSlot = pathStatsQueue.mapReducePaths(7, pathStatsQueue.getCurrentSlot());
+        Assert.assertTrue(newSlot.size() == 1111111);
+    }
+
+    @Test
+    public void testCollectEmptyStats() throws InterruptedException {
+        RequestPathMetricsCollector requestPathMetricsCollector = new RequestPathMetricsCollector();
+        RequestPathMetricsCollector.PathStatsQueue pathStatsQueue =
+            requestPathMetricsCollector.new PathStatsQueue(getChildren);
+        Thread.sleep(5000);
+        Map<String, Integer> newSlot = pathStatsQueue.mapReducePaths(3,
+            pathStatsQueue.getCurrentSlot());
+        Assert.assertTrue(newSlot.isEmpty());
+        pathStatsQueue.start();
+        Thread.sleep(15000);
+        newSlot = pathStatsQueue.collectStats(1);
+        Assert.assertTrue(newSlot.size() == 0);
+        newSlot = pathStatsQueue.collectStats(2);
+        Assert.assertTrue(newSlot.size() == 0);
+        newSlot = pathStatsQueue.collectStats(5);
+        Assert.assertTrue(newSlot.size() == 0);
+    }
+
+    @Test
+    public void testCollectStats() throws InterruptedException {
+        RequestPathMetricsCollector requestPathMetricsCollector =
+            new RequestPathMetricsCollector(true);
+        RequestPathMetricsCollector.PathStatsQueue pathStatsQueue =
+            requestPathMetricsCollector.new PathStatsQueue(getChildren);
+        pathStatsQueue.start();
+        Thread path7 = new Thread(() -> {
+            for(int i = 0; i < 10; i++) {
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                for (int j = 0; j < 100000; j++) {
+                    pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5/path6/path7"
+                        + "_" + i + "_" + j);
+                }
+            }
+        });
+        path7.start();
+        Thread path6 = new Thread(() -> {
+            pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5/path6");
+            for(int i = 0; i < 10; i++) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                for (int j = 0; j < 10000; j++) {
+                    pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5/path6"
+                        + "_" + i + "_" + j);
+                }
+            }
+        });
+        path6.start();
+        for(int i = 0; i < 1; i++) {
+            pathStatsQueue.registerRequest("/path1");
+        }        
+        for(int i = 0; i < 10; i++) {
+            pathStatsQueue.registerRequest("/path1/path2"+"_"+i);
+        }
+        for(int i = 0; i < 100; i++) {
+            pathStatsQueue.registerRequest("/path1/path2/path3"+"_"+i);
+        }
+        for(int i = 0; i < 1000; i++) {
+            pathStatsQueue.registerRequest("/path1/path2/path3/path4"+"_"+i);
+        }
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        for(int i = 0; i < 10000; i++) {
+            pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5"+"_"+i);
+        }
+        path6.join();
+        path7.join();
+        Map<String, Integer> newSlot = pathStatsQueue.collectStats(1);
+        Assert.assertEquals(newSlot.size(), 1);
+        Assert.assertEquals(newSlot.get("/path1").intValue(), 1111112);
+        //cut up to 2
+        newSlot = pathStatsQueue.collectStats(2);
+        Assert.assertEquals(newSlot.size(), 12);
+        Assert.assertEquals(newSlot.get("/path1").intValue(), 1);
+        Assert.assertEquals(newSlot.get("/path1/path2").intValue(), 1111101);
+        //cut up to 3
+        newSlot = pathStatsQueue.collectStats(3);
+        Assert.assertEquals(newSlot.size(), 112);
+        Assert.assertEquals(newSlot.get("/path1").intValue(), 1);
+        Assert.assertEquals(newSlot.get("/path1/path2/path3").intValue(), 1111001);
+        //cut up to 4
+        newSlot = pathStatsQueue.collectStats(4);
+        Assert.assertEquals(newSlot.size(), 1112);
+        Assert.assertEquals(newSlot.get("/path1/path2/path3/path4").intValue(), 1110001);
+        //cut up to 5
+        newSlot = pathStatsQueue.collectStats(5);
+        Assert.assertEquals(newSlot.size(), 11112);
+        Assert.assertEquals(newSlot.get("/path1/path2/path3/path4/path5").intValue(), 1100001);
+        //cut up to 6
+        newSlot = pathStatsQueue.collectStats(6);
+        Assert.assertEquals(newSlot.size(), 111112);
+        Assert.assertEquals(newSlot.get("/path1/path2/path3/path4/path5/path6").intValue(),
+            1000001);
+    }
+    
+    @Test
+    public void testAggregate() throws InterruptedException {
+        RequestPathMetricsCollector requestPathMetricsCollector =
+            new RequestPathMetricsCollector(true);
+        Thread path7 = new Thread(() -> {
+            for(int i = 0; i < 10; i++) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                for (int j = 0; j < 100000; j++) {
+                    requestPathMetricsCollector.registerRequest(getData,
+                        "/path1/path2/path3/path4/path5/path6/path7" + "_" + i + "_" + j);
+                }
+            }
+        });
+        path7.start();
+        Thread path6 = new Thread(() -> {
+            requestPathMetricsCollector.registerRequest(getChildren2,
+                "/path1/path2/path3/path4/path5/path6");
+            for(int i = 0; i < 10; i++) {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                for (int j = 0; j < 10000; j++) {
+                    requestPathMetricsCollector.registerRequest(getChildren,
+                        "/path1/path2/path3/path4/path5/path6" + "_" + i + "_" + j);
+                }
+            }
+        });
+        path6.start();
+        for(int i = 0; i < 1; i++) {
+            requestPathMetricsCollector.registerRequest(create2,
+                "/path1");
+        }
+        for(int i = 0; i < 10; i++) {
+            requestPathMetricsCollector.registerRequest(create,
+                "/path1/path2"+"_"+i);
+        }
+        for(int i = 0; i < 100; i++) {
+            requestPathMetricsCollector.registerRequest(delete,
+                "/path1/path2/path3"+"_"+i);
+        }
+        for(int i = 0; i < 1000; i++) {
+            requestPathMetricsCollector.registerRequest(setData,
+                "/path1/path2/path3/path4"+"_"+i);
+        }
+        for(int i = 0; i < 10000; i++) {
+            requestPathMetricsCollector.registerRequest(exists,
+                "/path1/path2/path3/path4/path5"+"_"+i);
+        }
+        path6.join();
+        path7.join();
+        Map<String, Integer> newSlot = requestPathMetricsCollector.aggregatePaths(2,
+            queue -> true);
+        Assert.assertEquals(newSlot.size(), 12);
+        Assert.assertEquals(newSlot.get("/path1").intValue(), 1);
+        Assert.assertEquals(newSlot.get("/path1/path2").intValue(), 1111101);
+        //cut up to 3
+        newSlot = requestPathMetricsCollector.aggregatePaths(3, queue -> true);
+        Assert.assertEquals(newSlot.size(), 112);
+        Assert.assertEquals(newSlot.get("/path1").intValue(), 1);
+        Assert.assertEquals(newSlot.get("/path1/path2/path3").intValue(), 1111001);
+        //cut up to 4
+        newSlot = requestPathMetricsCollector.aggregatePaths(4, queue -> true);
+        Assert.assertEquals(newSlot.size(), 1112);
+        Assert.assertEquals(newSlot.get("/path1/path2/path3/path4").intValue(), 1110001);
+        //cut up to 5
+        newSlot = requestPathMetricsCollector.aggregatePaths(5, queue -> true);
+        Assert.assertEquals(newSlot.size(), 11112);
+        Assert.assertEquals(newSlot.get("/path1/path2/path3/path4/path5").intValue(), 1100001);
+        //cut up to 6
+        newSlot = requestPathMetricsCollector.aggregatePaths(6, queue -> true);
+        Assert.assertEquals(newSlot.size(), 111112);
+        Assert.assertEquals(newSlot.get("/path1/path2/path3/path4/path5/path6").intValue(), 1000001);
+        //cut up to 7 but the initial mapReduce kept only 6
+        newSlot = requestPathMetricsCollector.aggregatePaths(7, queue -> true);
+        Assert.assertEquals(newSlot.size(), 111112);
+        Assert.assertEquals(newSlot.get("/path1/path2/path3/path4/path5/path6").intValue(), 1000001);
+        //test predicate
+        //cut up to 4 for all the reads
+        newSlot = requestPathMetricsCollector.aggregatePaths(4, queue -> !queue.isWriteOperation());
+        Assert.assertEquals(newSlot.size(), 1);
+        Assert.assertEquals(newSlot.get("/path1/path2/path3/path4").intValue(), 1110001);
+        //cut up to 4 for all the write
+        newSlot = requestPathMetricsCollector.aggregatePaths(4, queue -> queue.isWriteOperation());
+        Assert.assertEquals(newSlot.size(), 1111);
+        //cut up to 3 for all the write
+        newSlot = requestPathMetricsCollector.aggregatePaths(3, queue -> queue.isWriteOperation());
+        Assert.assertEquals(newSlot.size(), 112);
+        Assert.assertEquals(newSlot.get("/path1/path2/path3").intValue(), 1000);
+    }
+
+    @Test
+    public void testTopPath() throws InterruptedException {
+        RequestPathMetricsCollector requestPathMetricsCollector = new RequestPathMetricsCollector(true);
+        Thread path7 = new Thread(() -> {
+            for(int i = 0; i < 10; i++) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                for (int j = 0; j < 100000; j++) {
+                    requestPathMetricsCollector.registerRequest(getData,
+                        "/path1/path2/path3/path4/path5/path6/path7" + "_" + i + "_" + j);
+                }
+            }
+        });
+        path7.start();
+        Thread path6 = new Thread(() -> {
+            requestPathMetricsCollector.registerRequest(getChildren2,
+                "/path1/path2/path3/path4/path5/path6");
+            for(int i = 0; i < 10; i++) {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                for (int j = 0; j < 10000; j++) {
+                    requestPathMetricsCollector.registerRequest(getChildren,
+                        "/path1/path2/path3/path4/path5/path6" + "_" + i + "_" + j);
+                }
+            }
+        });
+        path6.start();
+        for(int i = 0; i < 1; i++) {
+            requestPathMetricsCollector.registerRequest(create2,
+                "/path1");
+        }
+        for(int i = 0; i < 10; i++) {
+            requestPathMetricsCollector.registerRequest(create,
+                "/path1/path2"+"_"+i);
+        }
+        for(int i = 0; i < 100; i++) {
+            requestPathMetricsCollector.registerRequest(delete,
+                "/path1/path2/path3"+"_"+i);
+        }
+        for(int i = 0; i < 1000; i++) {
+            requestPathMetricsCollector.registerRequest(setData,
+                "/path1/path2/path3/path4"+"_"+i);
+        }
+        for(int i = 0; i < 10000; i++) {
+            requestPathMetricsCollector.registerRequest(exists,
+                "/path1/path2/path3/path4/path5"+"_"+i);
+        }
+        path6.join();
+        path7.join();
+        StringBuilder sb1= new StringBuilder();
+        Map<String, Integer> newSlot = requestPathMetricsCollector.aggregatePaths(3,
+            queue -> queue.isWriteOperation());
+        requestPathMetricsCollector.logTopPaths(newSlot,
+            entry -> sb1.append(entry.getKey() + " : " + entry.getValue() + "\n"));
+        Assert.assertTrue(sb1.toString().startsWith("/path1/path2/path3 : 1000"));
+        StringBuilder sb2= new StringBuilder();
+        newSlot = requestPathMetricsCollector.aggregatePaths(3,
+            queue -> !queue.isWriteOperation());
+        requestPathMetricsCollector.logTopPaths(newSlot,
+            entry -> sb2.append(entry.getKey() + " : " + entry.getValue() + "\n"));
+        Assert.assertTrue(sb2.toString().startsWith("/path1/path2/path3 : 1110001"));
+        StringBuilder sb3= new StringBuilder();
+        newSlot = requestPathMetricsCollector.aggregatePaths(4, queue -> true);
+        requestPathMetricsCollector.logTopPaths(newSlot,
+            entry -> sb3.append(entry.getKey() + " : " + entry.getValue() + "\n"));
+        Assert.assertTrue(sb3.toString().startsWith("/path1/path2/path3/path4 : 1110001"));
+    }
+
+    @Test
+    public void testMultiThreadPerf() throws InterruptedException {
+        RequestPathMetricsCollector requestPathMetricsCollector = new RequestPathMetricsCollector();
+        Random rand = new Random(System.currentTimeMillis());
+        Long startTime = System.currentTimeMillis();
+        ThreadPoolExecutor executor =  (ThreadPoolExecutor) Executors.newCachedThreadPool();
+        //call 100k get Data
+        for(int i = 0; i < 100000; i++) {
+            executor.submit( new Thread(() ->
+                requestPathMetricsCollector.registerRequest(getData,
+                    "/path1/path2/path"+rand.nextInt(10))));
+        }
+        //5K create
+        for(int i = 0; i < 5000; i++) {
+            executor.submit( new Thread(() ->
+                requestPathMetricsCollector.registerRequest(create2,
+                    "/path1/path2/path"+rand.nextInt(10) )));
+        }
+        //5K delete
+        for(int i = 0; i < 5000; i++) {
+            executor.submit( new Thread(() ->
+                requestPathMetricsCollector.registerRequest(delete,
+                    "/path1/path2/path"+rand.nextInt(10) )));
+        }
+        //40K getChildren
+        for(int i = 0; i < 40000; i++) {
+            executor.submit( new Thread(() ->
+                requestPathMetricsCollector.registerRequest(getChildren,
+                    "/path1/path2/path"+rand.nextInt(10) )));
+        }
+        executor.shutdown();
+        //wait for at most 10 mill seconds
+        executor.awaitTermination(10, TimeUnit.MILLISECONDS);
+        Assert.assertTrue(executor.isTerminated());
+        Long endTime = System.currentTimeMillis();
+        //less than 2 seconds total time
+        Assert.assertTrue(TimeUnit.MILLISECONDS.toSeconds(endTime-startTime) < 3);
+    }
+}


Mime
View raw message