accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo-testing] branch master updated: Add performance test for small scans (#24)
Date Tue, 31 Jul 2018 15:58:37 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/master by this push:
     new c2eb3f2  Add performance test for small scans (#24)
c2eb3f2 is described below

commit c2eb3f28687aeb0ec6ee2273340b8e68281374f8
Author: Keith Turner <keith@deenlo.com>
AuthorDate: Tue Jul 31 11:58:35 2018 -0400

    Add performance test for small scans (#24)
---
 bin/performance-test                               |  24 +-
 .../performance/tests/RandomCachedLookupsPT.java   | 243 +++++++++++++++++++++
 2 files changed, 256 insertions(+), 11 deletions(-)

diff --git a/bin/performance-test b/bin/performance-test
index 93014e5..b606ad3 100755
--- a/bin/performance-test
+++ b/bin/performance-test
@@ -25,7 +25,7 @@ function print_usage() {
 Usage: performance-test <command> (<argument>)
 
 Possible commands:
-  run <output dir>                 Runs performance tests.
+  run <output dir> [filter]        Runs performance tests.
   compare <result 1> <result 2>    Compares results of two test.
   csv {files}                      Converts results to CSV
 EOF
@@ -70,16 +70,18 @@ case "$1" in
     mkdir -p "$2"
     start_cluster
     CLASSPATH="$CP" java -Dlog4j.configuration="file:$log4j_config" ${perf_pkg}.ListTests
| while read test_class; do
-      echo "Running test $test_class"
-      pt_tmp=$(mktemp -d -t accumulo_pt_XXXXXXX)
-      setup_accumulo
-      get_config_file accumulo-site.xml "$pt_tmp"
-      CLASSPATH="$CP" java -Dlog4j.configuration="file:$log4j_config"  ${perf_pkg}.MergeSiteConfig
"$test_class" "$pt_tmp"
-      put_config_file "$pt_tmp/accumulo-site.xml"
-      put_server_code "$at_home/core/target/accumulo-testing-core-$at_version.jar"
-      start_accumulo
-      get_config_file accumulo-client.properties "$pt_tmp"
-      CLASSPATH="$CP" java -Dlog4j.configuration="file:$log4j_config"  ${perf_pkg}.PerfTestRunner
"$pt_tmp/accumulo-client.properties" "$test_class" "$(get_version 'ACCUMULO')" "$2"
+      if [[ -z $3 || $test_class == *$3* ]]; then
+        echo "Running test $test_class"
+        pt_tmp=$(mktemp -d -t accumulo_pt_XXXXXXX)
+        setup_accumulo
+        get_config_file accumulo-site.xml "$pt_tmp"
+        CLASSPATH="$CP" java -Dlog4j.configuration="file:$log4j_config"  ${perf_pkg}.MergeSiteConfig
"$test_class" "$pt_tmp"
+        put_config_file "$pt_tmp/accumulo-site.xml"
+        put_server_code "$at_home/core/target/accumulo-testing-core-$at_version.jar"
+        start_accumulo
+        get_config_file accumulo-client.properties "$pt_tmp"
+        CLASSPATH="$CP" java -Dlog4j.configuration="file:$log4j_config"  ${perf_pkg}.PerfTestRunner
"$pt_tmp/accumulo-client.properties" "$test_class" "$(get_version 'ACCUMULO')" "$2"
+      fi
     done
     stop_cluster
     ;;
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/performance/tests/RandomCachedLookupsPT.java
b/core/src/main/java/org/apache/accumulo/testing/core/performance/tests/RandomCachedLookupsPT.java
new file mode 100644
index 0000000..e3f7b9b
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/performance/tests/RandomCachedLookupsPT.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.core.performance.tests;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.testing.core.performance.Environment;
+import org.apache.accumulo.testing.core.performance.PerformanceTest;
+import org.apache.accumulo.testing.core.performance.Report;
+import org.apache.accumulo.testing.core.performance.Report.Builder;
+import org.apache.accumulo.testing.core.performance.SystemConfiguration;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterables;
+
+public class RandomCachedLookupsPT implements PerformanceTest {
+
+  private static final int NUM_LOOKUPS_PER_THREAD = 25000;
+  private static final int NUM_ROWS = 100000;
+
+  @Override
+  public SystemConfiguration getConfiguration() {
+    Map<String,String> siteCfg = new HashMap<>();
+
+    siteCfg.put(Property.TSERV_SCAN_MAX_OPENFILES.getKey(), "1000");
+    siteCfg.put(Property.TSERV_MINTHREADS.getKey(), "256");
+    siteCfg.put(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getKey(), "32");
+    siteCfg.put(Property.TABLE_DURABILITY.getKey(), "flush");
+    siteCfg.put(Property.TSERV_DATACACHE_SIZE.getKey(), "2G");
+    siteCfg.put(Property.TSERV_INDEXCACHE_SIZE.getKey(), "1G");
+
+    // TODO it would be good if this could request a minimum amount of tserver memory
+
+    return new SystemConfiguration().setAccumuloConfig(siteCfg);
+  }
+
+  @Override
+  public Report runTest(Environment env) throws Exception {
+    Builder reportBuilder = Report.builder();
+
+    writeData(reportBuilder, env.getConnector(), NUM_ROWS);
+
+    long warmup = doLookups(env.getConnector(), 32, NUM_LOOKUPS_PER_THREAD);
+
+    long d1 = doLookups(env.getConnector(), 1, NUM_LOOKUPS_PER_THREAD);
+    long d4 = doLookups(env.getConnector(), 4, NUM_LOOKUPS_PER_THREAD);
+    long d8 = doLookups(env.getConnector(), 8, NUM_LOOKUPS_PER_THREAD);
+    long d16 = doLookups(env.getConnector(), 16, NUM_LOOKUPS_PER_THREAD);
+    long d32 = doLookups(env.getConnector(), 32, NUM_LOOKUPS_PER_THREAD);
+    long d64 = doLookups(env.getConnector(), 64, NUM_LOOKUPS_PER_THREAD);
+    long d128 = doLookups(env.getConnector(), 128, NUM_LOOKUPS_PER_THREAD);
+
+    reportBuilder.id("smalls");
+    reportBuilder.description("Runs multiple threads each doing lots of small random scans.
 For this test data and index cache are enabled.");
+    reportBuilder.info("warmup", 32 * NUM_LOOKUPS_PER_THREAD, warmup, "Random lookup per
sec for 32 threads");
+    reportBuilder.info("lookups_1", NUM_LOOKUPS_PER_THREAD, d1, "Random lookup per sec rate
for 1 thread");
+    reportBuilder.info("lookups_4", 4 * NUM_LOOKUPS_PER_THREAD, d4, "Random lookup per sec
rate for 4 threads");
+    reportBuilder.info("lookups_8", 8 * NUM_LOOKUPS_PER_THREAD, d8, "Random lookup per sec
rate for 8 threads");
+    reportBuilder.info("lookups_16", 16 * NUM_LOOKUPS_PER_THREAD, d16, "Random lookup per
sec rate for 16 threads");
+    reportBuilder.info("lookups_32", 32 * NUM_LOOKUPS_PER_THREAD, d32, "Random lookup per
sec rate for 32 threads");
+    reportBuilder.info("lookups_64", 64 * NUM_LOOKUPS_PER_THREAD, d64, "Random lookup per
sec rate for 64 threads");
+    reportBuilder.info("lookups_128", 128 * NUM_LOOKUPS_PER_THREAD, d128, "Random lookup
per sec rate for 128 threads");
+
+    reportBuilder.result("avg_1", d1 / (double) NUM_LOOKUPS_PER_THREAD, "Average milliseconds
per lookup for 1 thread");
+    reportBuilder.result("avg_4", d4 / (double) NUM_LOOKUPS_PER_THREAD, "Average milliseconds
per lookup for 4 threads");
+    reportBuilder.result("avg_8", d8 / (double) NUM_LOOKUPS_PER_THREAD, "Average milliseconds
per lookup for 8 threads");
+    reportBuilder.result("avg_16", d16 / (double) NUM_LOOKUPS_PER_THREAD, "Average milliseconds
per lookup for 16 threads");
+    reportBuilder.result("avg_32", d32 / (double) NUM_LOOKUPS_PER_THREAD, "Average milliseconds
per lookup for 32 threads");
+    reportBuilder.result("avg_64", d64 / (double) NUM_LOOKUPS_PER_THREAD, "Average milliseconds
per lookup for 64 threads");
+    reportBuilder.result("avg_128", d128 / (double) NUM_LOOKUPS_PER_THREAD, "Average milliseconds
per lookup for 128 threads");
+
+    return reportBuilder.build();
+  }
+
+  public static void writeData(Builder reportBuilder, Connector conn, int numRows) throws
Exception {
+
+    reportBuilder.parameter("rows", numRows, "Number of random rows written.  Each row has
4 columns.");
+
+    NewTableConfiguration ntc = new NewTableConfiguration();
+    Map<String,String> props = new HashMap<>();
+    props.put("table.file.compress.blocksize.index", "256K");
+    props.put("table.file.compress.blocksize", "8K");
+    props.put("table.cache.index.enable", "true");
+    props.put("table.cache.block.enable", "true");
+    ntc.setProperties(props);
+
+    long t1 = System.currentTimeMillis();
+    try {
+      conn.tableOperations().create("scanpt", ntc);
+    } catch (TableExistsException tee) {
+      conn.tableOperations().delete("scanpt");
+      conn.tableOperations().create("scanpt", ntc);
+    }
+
+    long t2 = System.currentTimeMillis();
+
+    SortedSet<Text> partitionKeys = new TreeSet<>(
+        Stream.of("1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f").map(Text::new).collect(toList()));
+    conn.tableOperations().addSplits("scanpt", partitionKeys);
+
+    long t3 = System.currentTimeMillis();
+
+    BatchWriter bw = conn.createBatchWriter("scanpt", new BatchWriterConfig());
+
+    Random rand = new Random();
+
+    for (int i = 0; i < numRows; i++) {
+      Mutation m = new Mutation(toHex(rand.nextLong()));
+      int c1 = rand.nextInt(1 << 10);
+      int c2 = rand.nextInt(1 << 10);
+      while (c1 == c2)
+        c2 = rand.nextInt(1 << 10);
+      int c3 = rand.nextInt(1 << 10);
+      while (c1 == c3 || c2 == c3)
+        c3 = rand.nextInt(1 << 10);
+      int c4 = rand.nextInt(1 << 10);
+      while (c1 == c4 || c2 == c4 || c3 == c4)
+        c4 = rand.nextInt(1 << 10);
+      m.put("fam1", toHex(c1, 3), toHex(rand.nextLong()));
+      m.put("fam1", toHex(c2, 3), toHex(rand.nextLong()));
+      m.put("fam1", toHex(c3, 3), toHex(rand.nextLong()));
+      m.put("fam1", toHex(c4, 3), toHex(rand.nextLong()));
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    long t4 = System.currentTimeMillis();
+
+    conn.tableOperations().compact("scanpt", new CompactionConfig().setFlush(true).setWait(true));
+
+    long t5 = System.currentTimeMillis();
+
+    try (Scanner scanner = conn.createScanner("scanpt", Authorizations.EMPTY)) {
+      // scan entire table to bring it into cache
+      Iterables.size(scanner);
+    }
+
+    long t6 = System.currentTimeMillis();
+
+    reportBuilder.info("create", t2 - t1, "Time to create table in ms");
+    reportBuilder.info("split", t3 - t2, "Time to split table in ms");
+    reportBuilder.info("write", 4 * numRows, t4 - t3, "Rate to write data in entries/sec");
+    reportBuilder.info("compact", 4 * numRows, t5 - t4, "Rate to compact table in entries/sec");
+    reportBuilder.info("fullScan", 4 * numRows, t6 - t5, "Rate to do full table scan in entries/sec");
+  }
+
+  private static long doLookups(Connector conn, int numThreads, int numScansPerThread) throws
Exception {
+
+    ExecutorService es = Executors.newFixedThreadPool(numThreads);
+
+    List<Future<?>> futures = new ArrayList<>(numThreads);
+
+    long t1 = System.currentTimeMillis();
+
+    for (int i = 0; i < numThreads; i++) {
+      futures.add(es.submit(() -> doLookups(conn, numScansPerThread)));
+    }
+
+    for (Future<?> future : futures) {
+      future.get();
+    }
+
+    long t2 = System.currentTimeMillis();
+
+    es.shutdown();
+
+    return t2 -t1;
+  }
+
+  private static void doLookups(Connector conn, int numScans) {
+    try {
+      Random rand = new Random();
+
+      for (int i = 0; i < numScans; i++) {
+        Scanner scanner = conn.createScanner("scanpt", Authorizations.EMPTY);
+
+        scanner.setRange(new Range(toHex(rand.nextLong())));
+
+        Iterables.size(scanner);
+
+        scanner.close();
+      }
+    } catch (TableNotFoundException e) {
+      e.printStackTrace();
+    }
+  }
+
+  public static String toHex(long l) {
+    String s = Long.toHexString(l);
+    return Strings.padStart(s, 16, '0');
+  }
+
+  public static String toHex(int i) {
+    String s = Integer.toHexString(i);
+    return Strings.padStart(s, 8, '0');
+  }
+
+  public static String toHex(int i, int len) {
+    String s = Integer.toHexString(i);
+    return Strings.padStart(s, len, '0');
+  }
+}


Mime
View raw message