accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mwa...@apache.org
Subject [accumulo-testing] branch 1.9 updated: Added random pausing to continuous ingest (#15)
Date Thu, 05 Jul 2018 15:09:28 GMT
This is an automated email from the ASF dual-hosted git repository.

mwalch pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/1.9 by this push:
     new 288a34e  Added random pausing to continuous ingest (#15)
288a34e is described below

commit 288a34ea653b22f256267512c2fbd3b4d0edf473
Author: Mike Walch <mwalch@apache.org>
AuthorDate: Thu Jul 5 11:09:26 2018 -0400

    Added random pausing to continuous ingest (#15)
---
 conf/accumulo-testing.properties.example           | 10 ++++
 .../apache/accumulo/testing/core/TestProps.java    | 10 ++++
 .../testing/core/continuous/ContinuousIngest.java  | 61 ++++++++++++++++++++--
 3 files changed, 78 insertions(+), 3 deletions(-)

diff --git a/conf/accumulo-testing.properties.example b/conf/accumulo-testing.properties.example
index 7468443..4fff104 100644
--- a/conf/accumulo-testing.properties.example
+++ b/conf/accumulo-testing.properties.example
@@ -77,6 +77,16 @@ test.ci.ingest.max.cq=32767
 test.ci.ingest.visibilities=
 # Checksums will be generated during ingest if set to true
 test.ci.ingest.checksum=true
+# Enables periodic pausing of ingest
+test.ci.ingest.pause.enabled=false
+# Minimum wait between ingest pauses (in seconds)
+test.ci.ingest.pause.wait.min=120
+# Maximum wait between ingest pauses (in seconds)
+test.ci.ingest.pause.wait.max=180
+# Minimum pause duration (in seconds)
+test.ci.ingest.pause.duration.min=60
+# Maximum pause duration (in seconds)
+test.ci.ingest.pause.duration.max=120
 
 # Batch walker
 # ------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/TestProps.java b/core/src/main/java/org/apache/accumulo/testing/core/TestProps.java
index b0927db..638d2db 100644
--- a/core/src/main/java/org/apache/accumulo/testing/core/TestProps.java
+++ b/core/src/main/java/org/apache/accumulo/testing/core/TestProps.java
@@ -92,6 +92,16 @@ public class TestProps {
   public static final String CI_INGEST_VISIBILITIES = CI_INGEST + "visibilities";
   // Checksums will be generated during ingest if set to true
   public static final String CI_INGEST_CHECKSUM = CI_INGEST + "checksum";
+  // Enables periodic pausing of ingest
+  public static final String CI_INGEST_PAUSE_ENABLED = CI_INGEST + "pause.enabled";
+  // Minimum wait between ingest pauses (in seconds)
+  public static final String CI_INGEST_PAUSE_WAIT_MIN = CI_INGEST + "pause.wait.min";
+  // Maximum wait between ingest pauses (in seconds)
+  public static final String CI_INGEST_PAUSE_WAIT_MAX = CI_INGEST + "pause.wait.max";
+  // Minimum pause duration (in seconds)
+  public static final String CI_INGEST_PAUSE_DURATION_MIN = CI_INGEST + "pause.duration.min";
+  // Maximum pause duration (in seconds)
+  public static final String CI_INGEST_PAUSE_DURATION_MAX = CI_INGEST + "pause.duration.max";
 
   /** Batch Walker **/
   // Sleep time between batch scans (in ms)
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java
b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java
index db281a7..4afd00c 100644
--- a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java
@@ -24,9 +24,11 @@ import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
+import com.google.common.base.Preconditions;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -39,17 +41,62 @@ import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.core.util.FastFormat;
 import org.apache.accumulo.testing.core.TestProps;
 import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ContinuousIngest {
 
+  private static final Logger log = LoggerFactory.getLogger(ContinuousIngest.class);
+
   private static final byte[] EMPTY_BYTES = new byte[0];
 
   private static List<ColumnVisibility> visibilities;
+  private static long lastPauseNs;
+  private static long pauseWaitSec;
 
   private static ColumnVisibility getVisibility(Random rand) {
     return visibilities.get(rand.nextInt(visibilities.size()));
   }
 
+  private static boolean pauseEnabled(Properties props) {
+    String value = props.getProperty(TestProps.CI_INGEST_PAUSE_ENABLED);
+    return Boolean.parseBoolean(value);
+  }
+
+  private static int getPauseWaitSec(Properties props, Random rand) {
+    int waitMin = Integer.parseInt(props.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MIN));
+    int waitMax = Integer.parseInt(props.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MAX));
+    Preconditions.checkState(waitMax >= waitMin && waitMin > 0);
+    if (waitMax == waitMin) {
+      return waitMin;
+    }
+    return (rand.nextInt(waitMax - waitMin) + waitMin);
+  }
+
+  private static int getPauseDurationSec(Properties props, Random rand) {
+    int durationMin = Integer.parseInt(props.getProperty(TestProps.CI_INGEST_PAUSE_DURATION_MIN));
+    int durationMax = Integer.parseInt(props.getProperty(TestProps.CI_INGEST_PAUSE_DURATION_MAX));
+    Preconditions.checkState(durationMax >= durationMin && durationMin > 0);
+    if (durationMax == durationMin) {
+      return durationMin;
+    }
+    return (rand.nextInt(durationMax - durationMin) + durationMin);
+  }
+
+  private static void pauseCheck(Properties props, Random rand) throws InterruptedException
{
+    if (pauseEnabled(props)) {
+      long elapsedNano = System.nanoTime() - lastPauseNs;
+      if (elapsedNano > (TimeUnit.SECONDS.toNanos(pauseWaitSec))) {
+        long pauseDurationSec = getPauseDurationSec(props, rand);
+        log.info("PAUSING for " + pauseDurationSec + "s");
+        Thread.sleep(TimeUnit.SECONDS.toMillis(pauseDurationSec));
+        lastPauseNs = System.nanoTime();
+        pauseWaitSec = getPauseWaitSec(props, rand);
+        log.info("INGESTING for " + pauseWaitSec + "s");
+      }
+    }
+  }
+
   public static void main(String[] args) throws Exception {
 
     if (args.length != 1) {
@@ -90,7 +137,7 @@ public class ContinuousIngest {
 
     byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8);
 
-    System.out.printf("UUID %d %s%n", System.currentTimeMillis(), new String(ingestInstanceId,
UTF_8));
+    log.info(String.format("UUID %d %s", System.currentTimeMillis(), new String(ingestInstanceId,
UTF_8)));
 
     long count = 0;
     final int flushInterval = 1000000;
@@ -115,6 +162,13 @@ public class ContinuousIngest {
     boolean checksum = Boolean.parseBoolean(props.getProperty(TestProps.CI_INGEST_CHECKSUM));
     long numEntries = Long.parseLong(props.getProperty(TestProps.CI_INGEST_CLIENT_ENTRIES));
 
+    if (pauseEnabled(props)) {
+      lastPauseNs = System.nanoTime();
+      pauseWaitSec = getPauseWaitSec(props, r);
+      log.info("PAUSING enabled");
+      log.info("INGESTING for " + pauseWaitSec + "s");
+    }
+
     out: while (true) {
       // generate first set of nodes
       ColumnVisibility cv = getVisibility(r);
@@ -154,6 +208,7 @@ public class ContinuousIngest {
         lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
         if (count >= numEntries)
           break out;
+        pauseCheck(props, r);
       }
 
       // create one big linked list, this makes all of the first inserts
@@ -167,8 +222,8 @@ public class ContinuousIngest {
       lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
       if (count >= numEntries)
         break out;
+      pauseCheck(props, r);
     }
-
     bw.close();
   }
 
@@ -176,7 +231,7 @@ public class ContinuousIngest {
     long t1 = System.currentTimeMillis();
     bw.flush();
     long t2 = System.currentTimeMillis();
-    System.out.printf("FLUSH %d %d %d %d %d%n", t2, (t2 - lastFlushTime), (t2 - t1), count,
flushInterval);
+    log.info(String.format("FLUSH %d %d %d %d %d", t2, (t2 - lastFlushTime), (t2 - t1), count,
flushInterval));
     lastFlushTime = t2;
     return lastFlushTime;
   }


Mime
View raw message