This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 6d63602 ZOOKEEPER-3430: Observability improvement: provide top N read / write path queries.
6d63602 is described below
commit 6d636025d1c6a10840a27caee9e8933f1dbcbaf0
Author: Michael Han <lhan@twitter.com>
AuthorDate: Thu Aug 1 07:28:24 2019 +0200
ZOOKEEPER-3430: Observability improvement: provide top N read / write path queries.
We would like to have a better understanding of the type of workloads hit ZK, and one aspect of such understanding is to be able to answer queries of top N read and top N write request path. Knowing the hot request paths will allow us better optimize for such workloads, for example, enabling path specific caching, or change the path structure (e.g. break a long path to hierarchical paths).
This commit adds a RequestPathMetricsCollector which will collect path requests stats and answers such queries. There are also a set of new Java system properties that make this collector highly configurable, including disable it totally (the default option is on.).
The stats can also be queried through commands, which I will add in a different pull request once this landed, since this one is already big.
Author: Michael Han <lhan@twitter.com>
Reviewers: Enrico Olivelli <eolivelli@apache.org>
Closes #989 from hanm/twitter/065e164bfbbf467a28602da76e9443faacaec1c8
---
.../zookeeper/server/FinalRequestProcessor.java | 25 +-
.../java/org/apache/zookeeper/server/Request.java | 2 +-
.../apache/zookeeper/server/ZooKeeperServer.java | 11 +
.../server/util/RequestPathMetricsCollector.java | 386 +++++++++++++++++
.../util/RequestPathMetricsCollectorTest.java | 464 +++++++++++++++++++++
5 files changed, 881 insertions(+), 7 deletions(-)
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
index cd0c0eb..d210d70 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
@@ -75,15 +75,11 @@ import org.apache.zookeeper.proto.SyncResponse;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer;
+import org.apache.zookeeper.server.util.RequestPathMetricsCollector;
+
import org.apache.zookeeper.txn.ErrorTxn;
import org.apache.zookeeper.txn.TxnHeader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Locale;
import java.util.Set;
@@ -99,10 +95,13 @@ import java.util.Set;
public class FinalRequestProcessor implements RequestProcessor {
private static final Logger LOG = LoggerFactory.getLogger(FinalRequestProcessor.class);
+ private final RequestPathMetricsCollector requestPathMetricsCollector;
+
ZooKeeperServer zks;
public FinalRequestProcessor(ZooKeeperServer zks) {
this.zks = zks;
+ this.requestPathMetricsCollector = zks.getRequestPathMetricsCollector();
}
public void processRequest(Request request) {
@@ -312,6 +311,7 @@ public class FinalRequestProcessor implements RequestProcessor {
lastOp = "CREA";
rsp = new CreateResponse(rc.path);
err = Code.get(rc.err);
+ requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.create2:
@@ -320,18 +320,21 @@ public class FinalRequestProcessor implements RequestProcessor {
lastOp = "CREA";
rsp = new Create2Response(rc.path, rc.stat);
err = Code.get(rc.err);
+ requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.delete:
case OpCode.deleteContainer: {
lastOp = "DELE";
err = Code.get(rc.err);
+ requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.setData: {
lastOp = "SETD";
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
+ requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.reconfig: {
@@ -344,6 +347,7 @@ public class FinalRequestProcessor implements RequestProcessor {
lastOp = "SETA";
rsp = new SetACLResponse(rc.stat);
err = Code.get(rc.err);
+ requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.closeSession: {
@@ -357,6 +361,7 @@ public class FinalRequestProcessor implements RequestProcessor {
ByteBufferInputStream.byteBuffer2Record(request.request,
syncRequest);
rsp = new SyncResponse(syncRequest.getPath());
+ requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath());
break;
}
case OpCode.check: {
@@ -378,6 +383,7 @@ public class FinalRequestProcessor implements RequestProcessor {
Stat stat = zks.getZKDatabase().statNode(path, existsRequest
.getWatch() ? cnxn : null);
rsp = new ExistsResponse(stat);
+ requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.getData: {
@@ -387,6 +393,7 @@ public class FinalRequestProcessor implements RequestProcessor {
getDataRequest);
path = getDataRequest.getPath();
rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
+ requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.setWatches: {
@@ -419,6 +426,8 @@ public class FinalRequestProcessor implements RequestProcessor {
Stat stat = new Stat();
List<ACL> acl =
zks.getZKDatabase().getACL(path, stat);
+ requestPathMetricsCollector.registerRequest(request.type, getACLRequest.getPath());
+
try {
PrepRequestProcessor.checkACL(zks, request.cnxn, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.ADMIN,
@@ -446,6 +455,7 @@ public class FinalRequestProcessor implements RequestProcessor {
getChildrenRequest);
path = getChildrenRequest.getPath();
rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo);
+ requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.getAllChildrenNumber: {
@@ -484,6 +494,7 @@ public class FinalRequestProcessor implements RequestProcessor {
path, stat, getChildren2Request
.getWatch() ? cnxn : null);
rsp = new GetChildren2Response(children, stat);
+ requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.checkWatches: {
@@ -500,6 +511,7 @@ public class FinalRequestProcessor implements RequestProcessor {
path, type);
throw new KeeperException.NoWatcherException(msg);
}
+ requestPathMetricsCollector.registerRequest(request.type, checkWatches.getPath());
break;
}
case OpCode.removeWatches: {
@@ -516,6 +528,7 @@ public class FinalRequestProcessor implements RequestProcessor {
path, type);
throw new KeeperException.NoWatcherException(msg);
}
+ requestPathMetricsCollector.registerRequest(request.type, removeWatches.getPath());
break;
}
case OpCode.getEphemerals: {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index e4fdb4d..769fc87 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -278,7 +278,7 @@ public class Request {
}
}
- static String op2String(int op) {
+ public static String op2String(int op) {
switch (op) {
case OpCode.notification:
return "notification";
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 60ab655..b74507b 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -73,6 +73,7 @@ import org.apache.zookeeper.server.auth.ProviderRegistry;
import org.apache.zookeeper.server.auth.ServerAuthenticationProvider;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
+import org.apache.zookeeper.server.util.RequestPathMetricsCollector;
import org.apache.zookeeper.server.util.JvmPauseMonitor;
import org.apache.zookeeper.server.util.OSMXBean;
import org.apache.zookeeper.txn.CreateSessionTxn;
@@ -124,6 +125,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
private boolean isResponseCachingEnabled = true;
/* contains the configuration file content read at startup */
protected String initialConfig;
+ private final RequestPathMetricsCollector requestPathMetricsCollector;
protected enum State {
INITIAL, RUNNING, SHUTDOWN, ERROR
@@ -209,6 +211,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
public ZooKeeperServer() {
listener = new ZooKeeperServerListenerImpl(this);
serverStats = new ServerStats(this);
+ this.requestPathMetricsCollector = new RequestPathMetricsCollector();
}
/**
@@ -236,6 +239,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
this.initialConfig = initialConfig;
+ this.requestPathMetricsCollector = new RequestPathMetricsCollector();
+
LOG.info("Created server with tickTime " + tickTime
+ " minSessionTimeout " + getMinSessionTimeout()
+ " maxSessionTimeout " + getMaxSessionTimeout()
@@ -279,6 +284,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
return serverStats;
}
+ public RequestPathMetricsCollector getRequestPathMetricsCollector() {
+ return requestPathMetricsCollector;
+ }
+
public BlueThrottle connThrottle() {
return connThrottle;
}
@@ -576,6 +585,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
registerMetrics();
setState(State.RUNNING);
+ requestPathMetricsCollector.start();
notifyAll();
}
@@ -727,6 +737,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
}
+ requestPathMetricsCollector.shutdown();
unregisterJMX();
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RequestPathMetricsCollector.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RequestPathMetricsCollector.java
new file mode 100644
index 0000000..c5baac0
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RequestPathMetricsCollector.java
@@ -0,0 +1,386 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringTokenizer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.server.Request;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.zookeeper.ZooDefs.OpCode.checkWatches;
+import static org.apache.zookeeper.ZooDefs.OpCode.create;
+import static org.apache.zookeeper.ZooDefs.OpCode.create2;
+import static org.apache.zookeeper.ZooDefs.OpCode.createContainer;
+import static org.apache.zookeeper.ZooDefs.OpCode.delete;
+import static org.apache.zookeeper.ZooDefs.OpCode.deleteContainer;
+import static org.apache.zookeeper.ZooDefs.OpCode.exists;
+import static org.apache.zookeeper.ZooDefs.OpCode.getACL;
+import static org.apache.zookeeper.ZooDefs.OpCode.getChildren;
+import static org.apache.zookeeper.ZooDefs.OpCode.getChildren2;
+import static org.apache.zookeeper.ZooDefs.OpCode.getData;
+import static org.apache.zookeeper.ZooDefs.OpCode.removeWatches;
+import static org.apache.zookeeper.ZooDefs.OpCode.setACL;
+import static org.apache.zookeeper.ZooDefs.OpCode.setData;
+import static org.apache.zookeeper.ZooDefs.OpCode.sync;
+
+/**
+ * This class holds the requests path ( up till a certain depth) stats per request type
+ */
+public class RequestPathMetricsCollector {
+ private static final Logger LOG = LoggerFactory.getLogger(RequestPathMetricsCollector.class);
+ // How many seconds does each slot represent, default is 15 seconds.
+ private final int REQUEST_STATS_SLOT_DURATION;
+ // How many slots we keep, default is 60 so it's 15 minutes total history.
+ private final int REQUEST_STATS_SLOT_CAPACITY;
+ // How far down the path we keep, default is 6.
+ private final int REQUEST_PREPROCESS_PATH_DEPTH;
+ // Sample rate, default is 0.1 (10%).
+ private final float REQUEST_PREPROCESS_SAMPLE_RATE;
+ private final long COLLECTOR_INITIAL_DELAY;
+ private final long COLLECTOR_DELAY;
+ private final int REQUEST_PREPROCESS_TOPPATH_MAX;
+ private final boolean enabled;
+
+ public static final String PATH_STATS_SLOT_CAPACITY = "zookeeper.pathStats.slotCapacity";
+ public static final String PATH_STATS_SLOT_DURATION = "zookeeper.pathStats.slotDuration";
+ public static final String PATH_STATS_MAX_DEPTH = "zookeeper.pathStats.maxDepth";
+ public static final String PATH_STATS_SAMPLE_RATE = "zookeeper.pathStats.sampleRate";
+ public static final String PATH_STATS_COLLECTOR_INITIAL_DELAY = "zookeeper.pathStats.initialDelay";
+ public static final String PATH_STATS_COLLECTOR_DELAY = "zookeeper.pathStats.delay";
+ public static final String PATH_STATS_TOP_PATH_MAX = "zookeeper.pathStats.topPathMax";
+ public static final String PATH_STATS_ENABLED = "zookeeper.pathStats.enabled";
+ private static final String PATH_SEPERATOR = "/";
+
+ private final Map<String, PathStatsQueue> immutableRequestsMap;
+ private final Random sampler;
+ private final ScheduledThreadPoolExecutor scheduledExecutor;
+ private final boolean accurateMode;
+
+ public RequestPathMetricsCollector() {
+ this(false);
+ }
+
+ public RequestPathMetricsCollector(boolean accurateMode) {
+ final Map<String, PathStatsQueue> requestsMap = new HashMap<>();
+ this.sampler = new Random(System.currentTimeMillis());
+ this.accurateMode = accurateMode;
+
+ REQUEST_PREPROCESS_TOPPATH_MAX = Integer.getInteger(PATH_STATS_TOP_PATH_MAX, 20);
+ REQUEST_STATS_SLOT_DURATION = Integer.getInteger(PATH_STATS_SLOT_DURATION, 15);
+ REQUEST_STATS_SLOT_CAPACITY = Integer.getInteger(PATH_STATS_SLOT_CAPACITY, 60);
+ REQUEST_PREPROCESS_PATH_DEPTH = Integer.getInteger(PATH_STATS_MAX_DEPTH, 6);
+ REQUEST_PREPROCESS_SAMPLE_RATE = Float.parseFloat(
+ System.getProperty(PATH_STATS_SAMPLE_RATE, "0.1"));
+ COLLECTOR_INITIAL_DELAY = Long.getLong(PATH_STATS_COLLECTOR_INITIAL_DELAY, 5);
+ COLLECTOR_DELAY = Long.getLong(PATH_STATS_COLLECTOR_DELAY, 5);
+ enabled = Boolean.getBoolean(PATH_STATS_ENABLED);
+
+ LOG.info("{} = {}", PATH_STATS_SLOT_CAPACITY, REQUEST_STATS_SLOT_CAPACITY);
+ LOG.info("{} = {}", PATH_STATS_SLOT_DURATION, REQUEST_STATS_SLOT_DURATION);
+ LOG.info("{} = {}", PATH_STATS_MAX_DEPTH, REQUEST_PREPROCESS_PATH_DEPTH);
+ LOG.info("{} = {}", PATH_STATS_COLLECTOR_INITIAL_DELAY, COLLECTOR_INITIAL_DELAY);
+ LOG.info("{} = {}", PATH_STATS_COLLECTOR_DELAY, COLLECTOR_DELAY);
+ LOG.info("{} = {}", PATH_STATS_ENABLED, enabled);
+
+ this.scheduledExecutor = (ScheduledThreadPoolExecutor) Executors
+ .newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
+ scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ requestsMap.put(Request.op2String(create), new PathStatsQueue(create));
+ requestsMap.put(Request.op2String(create2), new PathStatsQueue(create2));
+ requestsMap.put(Request.op2String(createContainer), new PathStatsQueue(createContainer));
+ requestsMap.put(Request.op2String(deleteContainer), new PathStatsQueue(deleteContainer));
+ requestsMap.put(Request.op2String(delete), new PathStatsQueue(delete));
+ requestsMap.put(Request.op2String(exists), new PathStatsQueue(exists));
+ requestsMap.put(Request.op2String(setData), new PathStatsQueue(setData));
+ requestsMap.put(Request.op2String(getData), new PathStatsQueue(getData));
+ requestsMap.put(Request.op2String(getACL), new PathStatsQueue(getACL));
+ requestsMap.put(Request.op2String(setACL), new PathStatsQueue(setACL));
+ requestsMap.put(Request.op2String(getChildren), new PathStatsQueue(getChildren));
+ requestsMap.put(Request.op2String(getChildren2), new PathStatsQueue(getChildren2));
+ requestsMap.put(Request.op2String(checkWatches), new PathStatsQueue(checkWatches));
+ requestsMap.put(Request.op2String(removeWatches), new PathStatsQueue(removeWatches));
+ requestsMap.put(Request.op2String(sync), new PathStatsQueue(sync));
+ this.immutableRequestsMap = java.util.Collections.unmodifiableMap(requestsMap);
+ }
+
+ static boolean isWriteOp(int requestType) {
+ switch (requestType) {
+ case ZooDefs.OpCode.sync:
+ case ZooDefs.OpCode.create:
+ case ZooDefs.OpCode.create2:
+ case ZooDefs.OpCode.createContainer:
+ case ZooDefs.OpCode.delete:
+ case ZooDefs.OpCode.deleteContainer:
+ case ZooDefs.OpCode.setData:
+ case ZooDefs.OpCode.reconfig:
+ case ZooDefs.OpCode.setACL:
+ case ZooDefs.OpCode.multi:
+ case ZooDefs.OpCode.check:
+ return true;
+ }
+ return false;
+ }
+
+ static String trimPathDepth(String path, int maxDepth) {
+ int count = 0;
+ StringBuilder sb = new StringBuilder();
+ StringTokenizer pathTokenizer = new StringTokenizer(path, PATH_SEPERATOR);
+ while (pathTokenizer.hasMoreElements() && count++ < maxDepth) {
+ sb.append(PATH_SEPERATOR);
+ sb.append(pathTokenizer.nextToken());
+ }
+ path = sb.toString();
+ return path;
+ }
+
+ public void shutdown(){
+ if (!enabled) return;
+
+ LOG.info("shutdown scheduledExecutor");
+ scheduledExecutor.shutdownNow();
+ }
+
+ public void start() {
+ if (!enabled) return;
+
+ LOG.info("Start the RequestPath collector");
+ immutableRequestsMap.forEach((opType, pathStatsQueue) -> pathStatsQueue.start());
+
+ // Schedule to log the top used read/write paths every 5 mins
+ scheduledExecutor.scheduleWithFixedDelay(() -> {
+ LOG.info("%nHere are the top Read paths:");
+ logTopPaths(aggregatePaths(4, queue -> !queue.isWriteOperation()),
+ entry -> LOG.info(entry.getKey() + " : " + entry.getValue()));
+ LOG.info("%nHere are the top Write paths:");
+ logTopPaths(aggregatePaths(4, queue -> queue.isWriteOperation()),
+ entry -> LOG.info(entry.getKey() + " : " + entry.getValue()));
+ }, COLLECTOR_INITIAL_DELAY, COLLECTOR_DELAY, TimeUnit.MINUTES);
+ }
+
+ /**
+ * The public interface of the buffer. FinalRequestHandler will call into this for
+ * each request that has a path and this needs to be fast. we sample the path so that
+ * we don't have to store too many paths in memory
+ */
+ public void registerRequest(int type, String path) {
+ if (!enabled) return;
+ if (sampler.nextFloat() <= REQUEST_PREPROCESS_SAMPLE_RATE) {
+ PathStatsQueue pathStatsQueue = immutableRequestsMap.get(Request.op2String(type));
+ if (pathStatsQueue != null) {
+ pathStatsQueue.registerRequest(path);
+ } else {
+ LOG.error("We should not handle {}", type);
+ }
+ }
+ }
+
+ public void dumpTopRequestPath(PrintWriter pwriter, String requestTypeName, int queryMaxDepth) {
+ if (queryMaxDepth < 1) {
+ return;
+ }
+ PathStatsQueue pathStatsQueue = immutableRequestsMap.get(requestTypeName);
+ if (pathStatsQueue == null) {
+ pwriter.println("Can not find path stats for type: " + requestTypeName);
+ return;
+ } else {
+ pwriter.println("The top requests of type: " + requestTypeName);
+ }
+ Map<String, Integer> combinedMap;
+ final int maxDepth = Math.min(queryMaxDepth, REQUEST_PREPROCESS_PATH_DEPTH);
+ combinedMap = pathStatsQueue.collectStats(maxDepth);
+ logTopPaths(combinedMap, entry -> pwriter.println(entry.getKey() + " : " + entry.getValue()));
+ }
+
+ public void dumpTopReadPaths(PrintWriter pwriter, int queryMaxDepth) {
+ pwriter.println("The top read requests are");
+ dumpTopAggregatedPaths(pwriter, queryMaxDepth, queue -> !queue.isWriteOperation);
+ }
+
+ public void dumpTopWritePaths(PrintWriter pwriter, int queryMaxDepth) {
+ pwriter.println("The top write requests are");
+ dumpTopAggregatedPaths(pwriter, queryMaxDepth, queue -> queue.isWriteOperation);
+ }
+
+ public void dumpTopPaths(PrintWriter pwriter, int queryMaxDepth) {
+ pwriter.println("The top requests are");
+ dumpTopAggregatedPaths(pwriter, queryMaxDepth, queue -> true);
+ }
+
+ /**
+ * Combine all the path Stats Queue that matches the predicate together
+ * and then write to the pwriter
+ */
+ private void dumpTopAggregatedPaths(
+ PrintWriter pwriter, int queryMaxDepth, final Predicate<PathStatsQueue> predicate) {
+ if (!enabled) return;
+ final Map<String, Integer> combinedMap = aggregatePaths(queryMaxDepth, predicate);
+ logTopPaths(combinedMap, entry -> pwriter.println(entry.getKey() + " : " + entry.getValue()));
+ }
+
+ Map<String, Integer> aggregatePaths(int queryMaxDepth, Predicate<PathStatsQueue> predicate) {
+ final Map<String, Integer> combinedMap = new HashMap<>(REQUEST_PREPROCESS_TOPPATH_MAX);
+ final int maxDepth = Math.min(queryMaxDepth, REQUEST_PREPROCESS_PATH_DEPTH);
+ immutableRequestsMap.values().stream()
+ .filter(predicate)
+ .forEach(pathStatsQueue ->
+ pathStatsQueue.collectStats(maxDepth)
+ .forEach((path, count) ->
+ combinedMap.put(path, combinedMap.getOrDefault(path, 0) + count)));
+ return combinedMap;
+ }
+
+ void logTopPaths(Map<String, Integer> combinedMap, final Consumer<Map.Entry<String, Integer>> output) {
+ combinedMap.entrySet().stream()
+ .sorted(Comparator.comparing(Map.Entry<String, Integer>::getValue).reversed())//sort by path count
+ .limit(REQUEST_PREPROCESS_TOPPATH_MAX)
+ .forEach(output);
+ }
+
+ class PathStatsQueue {
+
+ private final String requestTypeName;
+ private final AtomicReference<ConcurrentLinkedQueue<String>> currentSlot;
+ private final LinkedBlockingQueue<Map<String, Integer>> requestPathStats;
+ private final boolean isWriteOperation;
+
+ public PathStatsQueue(int requestType) {
+ this.requestTypeName = Request.op2String(requestType);
+ this.isWriteOperation = isWriteOp(requestType);
+ requestPathStats = new LinkedBlockingQueue<>(REQUEST_STATS_SLOT_CAPACITY);
+ currentSlot = new AtomicReference<>(new ConcurrentLinkedQueue<>());
+ }
+
+ /*
+ * The only write entry into this class, need to be fast.
+ * Just queue up the path to the current slot queue locking free.
+ */
+ public void registerRequest(String path) {
+ if (!enabled) return;
+ currentSlot.get().offer(path);
+ }
+
+ ConcurrentLinkedQueue<String> getCurrentSlot() {
+ return currentSlot.get();
+ }
+
+ /**
+ * Helper function to MR the paths in the queue to map with count
+ * 1. cut each path up to max depth
+ * 2. aggregate the paths based on its count
+ *
+ * @param tobeProcessedSlot queue of paths called
+ * @return a map containing aggregated path in the queue
+ */
+ Map<String, Integer> mapReducePaths(int maxDepth, Collection<String> tobeProcessedSlot) {
+ Map<String, Integer> newSlot = new ConcurrentHashMap<>();
+ tobeProcessedSlot.stream()
+ .filter(path -> path != null)
+ .forEach((path) -> {
+ path = trimPathDepth(path, maxDepth);
+ newSlot.put(path, newSlot.getOrDefault(path, 0) + 1);
+ });
+ return newSlot;
+ }
+
+ /**
+ * The only read point of this class
+ *
+ * @return the aggregated path to count map
+ */
+ public Map<String, Integer> collectStats(int maxDepth) {
+ Map<String, Integer> combinedMap;
+ // Take a snapshot of the current slot and convert it to map.
+ // Set the initial size as 0 since we don't want it to padding nulls in the end.
+ Map<String, Integer> snapShot = mapReducePaths(maxDepth,
+ Arrays.asList(currentSlot.get().toArray(new String[0])));
+ // Starting from the snapshot and go through the queue to reduce them into one map
+ // the iterator can run concurrently with write but we want to use a real lock in the test
+ synchronized (accurateMode ? requestPathStats : new Object()) {
+ combinedMap =
+ requestPathStats.stream().reduce(snapShot, (firstMap, secondMap) -> {
+ secondMap.forEach((key, value) -> {
+ String trimmedPath = trimPathDepth(key, maxDepth);
+ firstMap.put(trimmedPath, firstMap.getOrDefault(trimmedPath, 0) + value);
+ }
+ );
+ return firstMap;
+ });
+ }
+ return combinedMap;
+ }
+
+ /**
+ * Start to schedule the pre-processing of the current slot
+ */
+ public void start() {
+ if (!enabled) return;
+ // Staggered start and then run every 15 seconds no matter what
+ int delay = sampler.nextInt(REQUEST_STATS_SLOT_DURATION);
+ // We need to use fixed Delay as the fixed rate will start the next one right
+ // after the previous one finishes if it runs overtime instead of overlapping it.
+ scheduledExecutor.scheduleWithFixedDelay(() -> {
+ // Generate new slot so new requests will go here.
+ ConcurrentLinkedQueue<String> tobeProcessedSlot =
+ currentSlot.getAndSet(new ConcurrentLinkedQueue<>());
+ try {
+ // pre process the last slot and queue it up, only one thread scheduled modified
+ // this but we can mess up the collect part so we put a lock in the test.
+ Map<String, Integer> latestSlot =
+ mapReducePaths(REQUEST_PREPROCESS_PATH_DEPTH, tobeProcessedSlot);
+ synchronized (accurateMode ? requestPathStats : new Object()) {
+ if (requestPathStats.remainingCapacity() <= 0) {
+ requestPathStats.poll();
+ }
+ if (!requestPathStats.offer(latestSlot)) {
+ LOG.error("Failed to insert the new request path stats for {}", requestTypeName);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to insert the new request path stats for {} with exception {}",
+ requestTypeName, e);
+ }
+ }, delay, REQUEST_STATS_SLOT_DURATION, TimeUnit.SECONDS);
+ }
+
+ boolean isWriteOperation() {
+ return isWriteOperation;
+ }
+ }
+}
+
+
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/RequestPathMetricsCollectorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/RequestPathMetricsCollectorTest.java
new file mode 100644
index 0000000..1ed8a4e
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/RequestPathMetricsCollectorTest.java
@@ -0,0 +1,464 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.Assert;
+
+import static org.apache.zookeeper.ZooDefs.OpCode.create;
+import static org.apache.zookeeper.ZooDefs.OpCode.create2;
+import static org.apache.zookeeper.ZooDefs.OpCode.delete;
+import static org.apache.zookeeper.ZooDefs.OpCode.exists;
+import static org.apache.zookeeper.ZooDefs.OpCode.getChildren;
+import static org.apache.zookeeper.ZooDefs.OpCode.getChildren2;
+import static org.apache.zookeeper.ZooDefs.OpCode.getData;
+import static org.apache.zookeeper.ZooDefs.OpCode.setData;
+
+public class RequestPathMetricsCollectorTest {
+
+ @Before
+ public void setUp() {
+ System.setProperty("zookeeper.pathStats.enabled", "true");
+ System.setProperty("zookeeper.pathStats.slotCapacity", "60");
+ System.setProperty("zookeeper.pathStats.slotDuration", "1");
+ System.setProperty("zookeeper.pathStats.maxDepth", "6");
+ System.setProperty("zookeeper.pathStats.sampleRate", "1.0");
+ }
+
+ @After
+ public void tearDown() {
+ System.clearProperty("zookeeper.pathStats.enabled");
+ System.clearProperty("zookeeper.pathStats.slotCapacity");
+ System.clearProperty("zookeeper.pathStats.slotDuration");
+ System.clearProperty("zookeeper.pathStats.maxDepth");
+ System.clearProperty("zookeeper.pathStats.sampleRate");
+ }
+
+ @Test
+ public void testTrimPath() {
+ //normal cases
+ String trimedPath = RequestPathMetricsCollector.trimPathDepth("/p1/p2/p3", 1);
+ Assert.assertTrue(trimedPath.equalsIgnoreCase("/p1"));
+ trimedPath = RequestPathMetricsCollector.trimPathDepth("/p1/p2/p3", 2);
+ Assert.assertTrue(trimedPath.equalsIgnoreCase("/p1/p2"));
+ trimedPath = RequestPathMetricsCollector.trimPathDepth("/p1/p2/p3", 3);
+ Assert.assertTrue(trimedPath.equalsIgnoreCase("/p1/p2/p3"));
+ trimedPath = RequestPathMetricsCollector.trimPathDepth("/p1/p2/p3", 4);
+ Assert.assertTrue(trimedPath.equalsIgnoreCase("/p1/p2/p3"));
+ //some extra symbols
+ trimedPath = RequestPathMetricsCollector.trimPathDepth("//p1 next/p2.index/p3:next",
+ 3);
+ Assert.assertTrue(trimedPath.equalsIgnoreCase("/p1 next/p2.index/p3:next"));
+ trimedPath = RequestPathMetricsCollector.trimPathDepth("//p1 next/p2.index/p3:next",
+ 2);
+ Assert.assertTrue(trimedPath.equalsIgnoreCase("/p1 next/p2.index"));
+ trimedPath = RequestPathMetricsCollector.trimPathDepth("//p1 next/p2.index/p3:next",
+ 6);
+ Assert.assertTrue(trimedPath.equalsIgnoreCase("/p1 next/p2.index/p3:next"));
+ }
+
+ @Test
+ public void testQueueMapReduce() throws InterruptedException {
+ RequestPathMetricsCollector requestPathMetricsCollector = new RequestPathMetricsCollector();
+ RequestPathMetricsCollector.PathStatsQueue pathStatsQueue =
+ requestPathMetricsCollector.new PathStatsQueue(create2);
+ Thread path7 = new Thread(() -> {
+ for (int i = 0; i < 1000000; i++) {
+ pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5/path6/path7" +
+ "_" + i);
+ }
+ });
+ path7.start();
+ Thread path6 = new Thread(() -> {
+ pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5/path6");
+ for(int i = 1; i < 100000; i++) {
+ pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5/path6"+"_"+i);
+ }
+ });
+ path6.start();
+ for(int i = 0; i < 1; i++) {
+ pathStatsQueue.registerRequest("/path1");
+ }
+ for(int i = 0; i < 10; i++) {
+ pathStatsQueue.registerRequest("/path1/path2"+"_"+i);
+ }
+ for(int i = 0; i < 100; i++) {
+ pathStatsQueue.registerRequest("/path1/path2/path3"+"_"+i);
+ }
+ for(int i = 0; i < 1000; i++) {
+ pathStatsQueue.registerRequest("/path1/path2/path3/path4"+"_"+i);
+ }
+ for(int i = 0; i < 10000; i++) {
+ pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5"+"_"+i);
+ }
+ path6.join();
+ path7.join();
+ Map<String, Integer> newSlot = pathStatsQueue.mapReducePaths(1,
+ pathStatsQueue.getCurrentSlot());
+ Assert.assertTrue(newSlot.size() == 1);
+ Assert.assertTrue(newSlot.get("/path1").compareTo(1111111) == 0);
+ //cut up to 2
+ newSlot = pathStatsQueue.mapReducePaths(2, pathStatsQueue.getCurrentSlot());
+ Assert.assertTrue(newSlot.size() == 12);
+ Assert.assertTrue(newSlot.get("/path1").compareTo(1) == 0);
+ Assert.assertTrue(newSlot.get("/path1/path2").compareTo(1111100) == 0);
+ //cut up to 3
+ newSlot = pathStatsQueue.mapReducePaths(3, pathStatsQueue.getCurrentSlot());
+ Assert.assertTrue(newSlot.size() == 112);
+ Assert.assertTrue(newSlot.get("/path1").compareTo(1) == 0);
+ Assert.assertTrue(newSlot.get("/path1/path2/path3").compareTo(1111000) == 0);
+ //cut up to 4
+ newSlot = pathStatsQueue.mapReducePaths(4, pathStatsQueue.getCurrentSlot());
+ Assert.assertTrue(newSlot.size() == 1112);
+ Assert.assertTrue(newSlot.get("/path1/path2/path3/path4").compareTo(1110000) == 0);
+ //cut up to 5
+ newSlot = pathStatsQueue.mapReducePaths(5, pathStatsQueue.getCurrentSlot());
+ Assert.assertTrue(newSlot.size() == 11112);
+ Assert.assertTrue(newSlot.get("/path1/path2/path3/path4/path5").compareTo(1100000) == 0);
+ //cut up to 6
+ newSlot = pathStatsQueue.mapReducePaths(6, pathStatsQueue.getCurrentSlot());
+ Assert.assertTrue(newSlot.size() == 111111);
+ Assert.assertTrue(newSlot.get("/path1/path2/path3/path4/path5/path6").compareTo(1000001) == 0);
+ //cut up to 7
+ newSlot = pathStatsQueue.mapReducePaths(7, pathStatsQueue.getCurrentSlot());
+ Assert.assertTrue(newSlot.size() == 1111111);
+ }
+
+ @Test
+ public void testCollectEmptyStats() throws InterruptedException {
+ RequestPathMetricsCollector requestPathMetricsCollector = new RequestPathMetricsCollector();
+ RequestPathMetricsCollector.PathStatsQueue pathStatsQueue =
+ requestPathMetricsCollector.new PathStatsQueue(getChildren);
+ Thread.sleep(5000);
+ Map<String, Integer> newSlot = pathStatsQueue.mapReducePaths(3,
+ pathStatsQueue.getCurrentSlot());
+ Assert.assertTrue(newSlot.isEmpty());
+ pathStatsQueue.start();
+ Thread.sleep(15000);
+ newSlot = pathStatsQueue.collectStats(1);
+ Assert.assertTrue(newSlot.size() == 0);
+ newSlot = pathStatsQueue.collectStats(2);
+ Assert.assertTrue(newSlot.size() == 0);
+ newSlot = pathStatsQueue.collectStats(5);
+ Assert.assertTrue(newSlot.size() == 0);
+ }
+
+ @Test
+ public void testCollectStats() throws InterruptedException {
+ RequestPathMetricsCollector requestPathMetricsCollector =
+ new RequestPathMetricsCollector(true);
+ RequestPathMetricsCollector.PathStatsQueue pathStatsQueue =
+ requestPathMetricsCollector.new PathStatsQueue(getChildren);
+ pathStatsQueue.start();
+ Thread path7 = new Thread(() -> {
+ for(int i = 0; i < 10; i++) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ for (int j = 0; j < 100000; j++) {
+ pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5/path6/path7"
+ + "_" + i + "_" + j);
+ }
+ }
+ });
+ path7.start();
+ Thread path6 = new Thread(() -> {
+ pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5/path6");
+ for(int i = 0; i < 10; i++) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ for (int j = 0; j < 10000; j++) {
+ pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5/path6"
+ + "_" + i + "_" + j);
+ }
+ }
+ });
+ path6.start();
+ for(int i = 0; i < 1; i++) {
+ pathStatsQueue.registerRequest("/path1");
+ }
+ for(int i = 0; i < 10; i++) {
+ pathStatsQueue.registerRequest("/path1/path2"+"_"+i);
+ }
+ for(int i = 0; i < 100; i++) {
+ pathStatsQueue.registerRequest("/path1/path2/path3"+"_"+i);
+ }
+ for(int i = 0; i < 1000; i++) {
+ pathStatsQueue.registerRequest("/path1/path2/path3/path4"+"_"+i);
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ for(int i = 0; i < 10000; i++) {
+ pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5"+"_"+i);
+ }
+ path6.join();
+ path7.join();
+ Map<String, Integer> newSlot = pathStatsQueue.collectStats(1);
+ Assert.assertEquals(newSlot.size(), 1);
+ Assert.assertEquals(newSlot.get("/path1").intValue(), 1111112);
+ //cut up to 2
+ newSlot = pathStatsQueue.collectStats(2);
+ Assert.assertEquals(newSlot.size(), 12);
+ Assert.assertEquals(newSlot.get("/path1").intValue(), 1);
+ Assert.assertEquals(newSlot.get("/path1/path2").intValue(), 1111101);
+ //cut up to 3
+ newSlot = pathStatsQueue.collectStats(3);
+ Assert.assertEquals(newSlot.size(), 112);
+ Assert.assertEquals(newSlot.get("/path1").intValue(), 1);
+ Assert.assertEquals(newSlot.get("/path1/path2/path3").intValue(), 1111001);
+ //cut up to 4
+ newSlot = pathStatsQueue.collectStats(4);
+ Assert.assertEquals(newSlot.size(), 1112);
+ Assert.assertEquals(newSlot.get("/path1/path2/path3/path4").intValue(), 1110001);
+ //cut up to 5
+ newSlot = pathStatsQueue.collectStats(5);
+ Assert.assertEquals(newSlot.size(), 11112);
+ Assert.assertEquals(newSlot.get("/path1/path2/path3/path4/path5").intValue(), 1100001);
+ //cut up to 6
+ newSlot = pathStatsQueue.collectStats(6);
+ Assert.assertEquals(newSlot.size(), 111112);
+ Assert.assertEquals(newSlot.get("/path1/path2/path3/path4/path5/path6").intValue(),
+ 1000001);
+ }
+
+ @Test
+ public void testAggregate() throws InterruptedException {
+ RequestPathMetricsCollector requestPathMetricsCollector =
+ new RequestPathMetricsCollector(true);
+ Thread path7 = new Thread(() -> {
+ for(int i = 0; i < 10; i++) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ for (int j = 0; j < 100000; j++) {
+ requestPathMetricsCollector.registerRequest(getData,
+ "/path1/path2/path3/path4/path5/path6/path7" + "_" + i + "_" + j);
+ }
+ }
+ });
+ path7.start();
+ Thread path6 = new Thread(() -> {
+ requestPathMetricsCollector.registerRequest(getChildren2,
+ "/path1/path2/path3/path4/path5/path6");
+ for(int i = 0; i < 10; i++) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ for (int j = 0; j < 10000; j++) {
+ requestPathMetricsCollector.registerRequest(getChildren,
+ "/path1/path2/path3/path4/path5/path6" + "_" + i + "_" + j);
+ }
+ }
+ });
+ path6.start();
+ for(int i = 0; i < 1; i++) {
+ requestPathMetricsCollector.registerRequest(create2,
+ "/path1");
+ }
+ for(int i = 0; i < 10; i++) {
+ requestPathMetricsCollector.registerRequest(create,
+ "/path1/path2"+"_"+i);
+ }
+ for(int i = 0; i < 100; i++) {
+ requestPathMetricsCollector.registerRequest(delete,
+ "/path1/path2/path3"+"_"+i);
+ }
+ for(int i = 0; i < 1000; i++) {
+ requestPathMetricsCollector.registerRequest(setData,
+ "/path1/path2/path3/path4"+"_"+i);
+ }
+ for(int i = 0; i < 10000; i++) {
+ requestPathMetricsCollector.registerRequest(exists,
+ "/path1/path2/path3/path4/path5"+"_"+i);
+ }
+ path6.join();
+ path7.join();
+ Map<String, Integer> newSlot = requestPathMetricsCollector.aggregatePaths(2,
+ queue -> true);
+ Assert.assertEquals(newSlot.size(), 12);
+ Assert.assertEquals(newSlot.get("/path1").intValue(), 1);
+ Assert.assertEquals(newSlot.get("/path1/path2").intValue(), 1111101);
+ //cut up to 3
+ newSlot = requestPathMetricsCollector.aggregatePaths(3, queue -> true);
+ Assert.assertEquals(newSlot.size(), 112);
+ Assert.assertEquals(newSlot.get("/path1").intValue(), 1);
+ Assert.assertEquals(newSlot.get("/path1/path2/path3").intValue(), 1111001);
+ //cut up to 4
+ newSlot = requestPathMetricsCollector.aggregatePaths(4, queue -> true);
+ Assert.assertEquals(newSlot.size(), 1112);
+ Assert.assertEquals(newSlot.get("/path1/path2/path3/path4").intValue(), 1110001);
+ //cut up to 5
+ newSlot = requestPathMetricsCollector.aggregatePaths(5, queue -> true);
+ Assert.assertEquals(newSlot.size(), 11112);
+ Assert.assertEquals(newSlot.get("/path1/path2/path3/path4/path5").intValue(), 1100001);
+ //cut up to 6
+ newSlot = requestPathMetricsCollector.aggregatePaths(6, queue -> true);
+ Assert.assertEquals(newSlot.size(), 111112);
+ Assert.assertEquals(newSlot.get("/path1/path2/path3/path4/path5/path6").intValue(), 1000001);
+ //cut up to 7 but the initial mapReduce kept only 6
+ newSlot = requestPathMetricsCollector.aggregatePaths(7, queue -> true);
+ Assert.assertEquals(newSlot.size(), 111112);
+ Assert.assertEquals(newSlot.get("/path1/path2/path3/path4/path5/path6").intValue(), 1000001);
+ //test predicate
+ //cut up to 4 for all the reads
+ newSlot = requestPathMetricsCollector.aggregatePaths(4, queue -> !queue.isWriteOperation());
+ Assert.assertEquals(newSlot.size(), 1);
+ Assert.assertEquals(newSlot.get("/path1/path2/path3/path4").intValue(), 1110001);
+ //cut up to 4 for all the write
+ newSlot = requestPathMetricsCollector.aggregatePaths(4, queue -> queue.isWriteOperation());
+ Assert.assertEquals(newSlot.size(), 1111);
+ //cut up to 3 for all the write
+ newSlot = requestPathMetricsCollector.aggregatePaths(3, queue -> queue.isWriteOperation());
+ Assert.assertEquals(newSlot.size(), 112);
+ Assert.assertEquals(newSlot.get("/path1/path2/path3").intValue(), 1000);
+ }
+
+ @Test
+ public void testTopPath() throws InterruptedException {
+ RequestPathMetricsCollector requestPathMetricsCollector = new RequestPathMetricsCollector(true);
+ Thread path7 = new Thread(() -> {
+ for(int i = 0; i < 10; i++) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ for (int j = 0; j < 100000; j++) {
+ requestPathMetricsCollector.registerRequest(getData,
+ "/path1/path2/path3/path4/path5/path6/path7" + "_" + i + "_" + j);
+ }
+ }
+ });
+ path7.start();
+ Thread path6 = new Thread(() -> {
+ requestPathMetricsCollector.registerRequest(getChildren2,
+ "/path1/path2/path3/path4/path5/path6");
+ for(int i = 0; i < 10; i++) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ for (int j = 0; j < 10000; j++) {
+ requestPathMetricsCollector.registerRequest(getChildren,
+ "/path1/path2/path3/path4/path5/path6" + "_" + i + "_" + j);
+ }
+ }
+ });
+ path6.start();
+ for(int i = 0; i < 1; i++) {
+ requestPathMetricsCollector.registerRequest(create2,
+ "/path1");
+ }
+ for(int i = 0; i < 10; i++) {
+ requestPathMetricsCollector.registerRequest(create,
+ "/path1/path2"+"_"+i);
+ }
+ for(int i = 0; i < 100; i++) {
+ requestPathMetricsCollector.registerRequest(delete,
+ "/path1/path2/path3"+"_"+i);
+ }
+ for(int i = 0; i < 1000; i++) {
+ requestPathMetricsCollector.registerRequest(setData,
+ "/path1/path2/path3/path4"+"_"+i);
+ }
+ for(int i = 0; i < 10000; i++) {
+ requestPathMetricsCollector.registerRequest(exists,
+ "/path1/path2/path3/path4/path5"+"_"+i);
+ }
+ path6.join();
+ path7.join();
+ StringBuilder sb1= new StringBuilder();
+ Map<String, Integer> newSlot = requestPathMetricsCollector.aggregatePaths(3,
+ queue -> queue.isWriteOperation());
+ requestPathMetricsCollector.logTopPaths(newSlot,
+ entry -> sb1.append(entry.getKey() + " : " + entry.getValue() + "\n"));
+ Assert.assertTrue(sb1.toString().startsWith("/path1/path2/path3 : 1000"));
+ StringBuilder sb2= new StringBuilder();
+ newSlot = requestPathMetricsCollector.aggregatePaths(3,
+ queue -> !queue.isWriteOperation());
+ requestPathMetricsCollector.logTopPaths(newSlot,
+ entry -> sb2.append(entry.getKey() + " : " + entry.getValue() + "\n"));
+ Assert.assertTrue(sb2.toString().startsWith("/path1/path2/path3 : 1110001"));
+ StringBuilder sb3= new StringBuilder();
+ newSlot = requestPathMetricsCollector.aggregatePaths(4, queue -> true);
+ requestPathMetricsCollector.logTopPaths(newSlot,
+ entry -> sb3.append(entry.getKey() + " : " + entry.getValue() + "\n"));
+ Assert.assertTrue(sb3.toString().startsWith("/path1/path2/path3/path4 : 1110001"));
+ }
+
+ @Test
+ public void testMultiThreadPerf() throws InterruptedException {
+ RequestPathMetricsCollector requestPathMetricsCollector = new RequestPathMetricsCollector();
+ Random rand = new Random(System.currentTimeMillis());
+ Long startTime = System.currentTimeMillis();
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
+ //call 100k get Data
+ for(int i = 0; i < 100000; i++) {
+ executor.submit( new Thread(() ->
+ requestPathMetricsCollector.registerRequest(getData,
+ "/path1/path2/path"+rand.nextInt(10))));
+ }
+ //5K create
+ for(int i = 0; i < 5000; i++) {
+ executor.submit( new Thread(() ->
+ requestPathMetricsCollector.registerRequest(create2,
+ "/path1/path2/path"+rand.nextInt(10) )));
+ }
+ //5K delete
+ for(int i = 0; i < 5000; i++) {
+ executor.submit( new Thread(() ->
+ requestPathMetricsCollector.registerRequest(delete,
+ "/path1/path2/path"+rand.nextInt(10) )));
+ }
+ //40K getChildren
+ for(int i = 0; i < 40000; i++) {
+ executor.submit( new Thread(() ->
+ requestPathMetricsCollector.registerRequest(getChildren,
+ "/path1/path2/path"+rand.nextInt(10) )));
+ }
+ executor.shutdown();
+ //wait for at most 10 mill seconds
+ executor.awaitTermination(10, TimeUnit.MILLISECONDS);
+ Assert.assertTrue(executor.isTerminated());
+ Long endTime = System.currentTimeMillis();
+ //less than 2 seconds total time
+ Assert.assertTrue(TimeUnit.MILLISECONDS.toSeconds(endTime-startTime) < 3);
+ }
+}
|