apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From david...@apache.org
Subject apex-malhar git commit: APEXMALHAR-2329 #resolve #comment ManagedState benchmark should not use constant bucket
Date Wed, 09 Nov 2016 23:05:20 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master 18e49df62 -> 28b89176c


APEXMALHAR-2329 #resolve #comment ManagedState benchmark should not use constant bucket


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/28b89176
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/28b89176
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/28b89176

Branch: refs/heads/master
Commit: 28b89176ce5468f04371dd19d9e49fe030f44248
Parents: 18e49df
Author: brightchen <bright@datatorrent.com>
Authored: Mon Nov 7 22:07:41 2016 -0800
Committer: brightchen <bright@datatorrent.com>
Committed: Wed Nov 9 13:23:19 2016 -0800

----------------------------------------------------------------------
 .../benchmark/state/StoreOperator.java          | 86 +++++++++++++-------
 .../src/main/resources/META-INF/properties.xml  |  2 +-
 .../state/ManagedStateBenchmarkAppTest.java     |  6 +-
 3 files changed, 62 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28b89176/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
index 2748c29..ad92b60 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
@@ -42,27 +42,28 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
 {
   private static final Logger logger = LoggerFactory.getLogger(StoreOperator.class);
 
-  public static enum ExecMode
+  public enum ExecMode
   {
     INSERT,
-    UPDATESYNC,
-    UPDATEASYNC
+    UPDATE_SYNC,
+    UPDATE_ASYNC,
+    GET_SYNC,
+    DO_NOTHING
   }
 
-  protected static final int numOfWindowPerStatistics = 10;
+  private static final int numOfWindowPerStatistics = 120;
 
   //this is the store we are going to use
-  protected ManagedTimeUnifiedStateImpl store;
-  protected long bucketId = 1;
+  private ManagedTimeUnifiedStateImpl store;
 
-  protected long lastCheckPointWindowId = -1;
-  protected long currentWindowId;
-  protected long tupleCount = 0;
-  protected int windowCountPerStatistics = 0;
-  protected long statisticsBeginTime = 0;
+  private long lastCheckPointWindowId = -1;
+  private long currentWindowId;
+  private long tupleCount = 0;
+  private int windowCountPerStatistics = 0;
+  private long statisticsBeginTime = 0;
 
-  protected ExecMode execMode = ExecMode.INSERT;
-  protected int timeRange = 1000 * 60;
+  private ExecMode execMode = ExecMode.INSERT;
+  private int timeRange = 1000 * 60;
 
   public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input =
new DefaultInputPort<KeyValPair<byte[], byte[]>>()
   {
@@ -81,6 +82,11 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
   }
 
   @Override
+  public void teardown()
+  {
+  }
+
+  @Override
   public void beginWindow(long windowId)
   {
     currentWindowId = windowId;
@@ -100,45 +106,70 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
     }
   }
 
-  protected transient Queue<Future<Slice>> taskQueue = new LinkedList<Future<Slice>>();
-  protected transient Map<Future<Slice>, KeyValPair<byte[], byte[]>> taskToPair
= Maps.newHashMap();
+  private transient Queue<Future<Slice>> taskQueue = new LinkedList<Future<Slice>>();
+  private transient Map<Future<Slice>, KeyValPair<byte[], byte[]>> taskToPair
= Maps.newHashMap();
 
   /**
    * we verify 3 type of operation
    * @param tuple
    */
-  protected void processTuple(KeyValPair<byte[], byte[]> tuple)
+  private Slice keySliceForRead = new Slice(null, 0, 0);
+  private void processTuple(KeyValPair<byte[], byte[]> tuple)
   {
     switch (execMode) {
-      case UPDATEASYNC:
+      case UPDATE_ASYNC:
         //handle it specially
         updateAsync(tuple);
         break;
 
-      case UPDATESYNC:
-        store.getSync(getTimeByKey(tuple.getKey()), new Slice(tuple.getKey()));
+
+      case UPDATE_SYNC:
+        keySliceForRead.buffer = tuple.getKey();
+        keySliceForRead.offset = 0;
+        keySliceForRead.length = tuple.getKey().length;
+        store.getSync(getTimeByKey(tuple.getKey()), keySliceForRead);
+
         insertValueToStore(tuple);
         break;
 
+      case GET_SYNC:
+        store.getSync(getTimeByKey(tuple.getKey()), new Slice(tuple.getKey()));
+        break;
+
+      case DO_NOTHING:
+        break;
+
       default: //insert
         insertValueToStore(tuple);
     }
+
+    ++tupleCount;
   }
 
-  protected long getTimeByKey(byte[] key)
+  private transient long sameKeyCount = 0;
+  private transient long preKey = -1;
+  private long getTimeByKey(byte[] key)
   {
     long lKey = ByteBuffer.wrap(key).getLong();
-    return lKey - (lKey % timeRange);
+    lKey = lKey - (lKey % timeRange);
+    if (preKey == lKey) {
+      sameKeyCount++;
+    } else {
+      logger.info("key: {} count: {}", preKey, sameKeyCount);
+      preKey = lKey;
+      sameKeyCount = 1;
+    }
+    return lKey;
   }
 
   // give a barrier to avoid used up memory
-  protected final int taskBarrier = 100000;
+  private final int taskBarrier = 100000;
 
   /**
    * This method first send request of get to the state manager, then handle all the task(get)
which already done and update the value.
    * @param tuple
    */
-  protected void updateAsync(KeyValPair<byte[], byte[]> tuple)
+  private void updateAsync(KeyValPair<byte[], byte[]> tuple)
   {
     if (taskQueue.size() > taskBarrier) {
       //slow down to avoid too much task waiting.
@@ -172,13 +203,12 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
     }
   }
 
-  protected void insertValueToStore(KeyValPair<byte[], byte[]> tuple)
+  private void insertValueToStore(KeyValPair<byte[], byte[]> tuple)
   {
     Slice key = new Slice(tuple.getKey());
     Slice value = new Slice(tuple.getValue());
 
-    store.put(bucketId, key, value);
-    ++tupleCount;
+    store.put(System.currentTimeMillis(), key, value);
   }
 
   @Override
@@ -209,10 +239,10 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
     this.store = store;
   }
 
-  protected void logStatistics()
+  private void logStatistics()
   {
     long spentTime = System.currentTimeMillis() - statisticsBeginTime;
-    logger.info("Time Spent: {}, Processed tuples: {}, rate per second: {}", spentTime, tupleCount,
tupleCount * 1000 / spentTime);
+    logger.info("Windows: {}; Time Spent: {}, Processed tuples: {}, rate per second: {}",
windowCountPerStatistics, spentTime, tupleCount, tupleCount * 1000 / spentTime);
 
     statisticsBeginTime = System.currentTimeMillis();
     tupleCount = 0;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28b89176/benchmark/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/benchmark/src/main/resources/META-INF/properties.xml b/benchmark/src/main/resources/META-INF/properties.xml
index b6131e4..aec92d4 100644
--- a/benchmark/src/main/resources/META-INF/properties.xml
+++ b/benchmark/src/main/resources/META-INF/properties.xml
@@ -195,7 +195,7 @@
   </property>
   <property>
     <name>dt.application.ManagedStateBenchmark.operator.Store.execModeStr</name>
-    <!-- valid value are: INSERT, UPDATESYNC, UPDATEASYNC -->
+    <!-- valid value are: INSERT, UPDATE_SYNC, UPDATE_ASYNC -->
     <value>INSERT</value>
   </property>
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/28b89176/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
index 4792843..5279d36 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
@@ -49,13 +49,13 @@ public class ManagedStateBenchmarkAppTest extends ManagedStateBenchmarkApp
   @Test
   public void testUpdateSync() throws Exception
   {
-    test(ExecMode.UPDATESYNC);
+    test(ExecMode.UPDATE_SYNC);
   }
 
   @Test
   public void testUpdateAsync() throws Exception
   {
-    test(ExecMode.UPDATEASYNC);
+    test(ExecMode.UPDATE_ASYNC);
   }
 
   @Test
@@ -72,7 +72,7 @@ public class ManagedStateBenchmarkAppTest extends ManagedStateBenchmarkApp
     DAG dag = lma.getDAG();
 
     super.populateDAG(dag, conf);
-    storeOperator.execMode = exeMode;
+    storeOperator.setExecMode(exeMode);
 
     StreamingApplication app = new StreamingApplication()
     {


Mime
View raw message