apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [1/2] apex-malhar git commit: APEXMALHAR-2345 Fix MovingBoundaryTimeBucketAssigner initialization and purge trigger.
Date Mon, 23 Jan 2017 06:51:51 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master e22ea0de1 -> 1ae14c03a


APEXMALHAR-2345 Fix MovingBoundaryTimeBucketAssigner initialization and purge trigger.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/1ae14c03
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/1ae14c03
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/1ae14c03

Branch: refs/heads/master
Commit: 1ae14c03a73e2814b57ce06680db57da80f0b42b
Parents: 9043f9d
Author: Thomas Weise <thw@apache.org>
Authored: Sun Jan 22 13:19:44 2017 -0800
Committer: David Yan <davidyan@apache.org>
Committed: Sun Jan 22 21:45:28 2017 -0800

----------------------------------------------------------------------
 .../state/managed/AbstractManagedStateImpl.java |  3 +++
 .../managed/IncrementalCheckpointManager.java   |  1 +
 .../MovingBoundaryTimeBucketAssigner.java       |  5 ++--
 .../lib/state/managed/ManagedStateImplTest.java |  4 ++--
 .../MovingBoundaryTimeBucketAssignerTest.java   | 25 ++++++++++++++++++++
 5 files changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1ae14c03/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index daae2d8..f676b84 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -220,6 +220,8 @@ public abstract class AbstractManagedStateImpl
               buckets.get(bucketIdx).recoveredData(stateEntry.getKey(), bucketEntry.getValue());
             }
           }
+          // Skip write to WAL during recovery during replay from WAL.
+          // Data only needs to be transferred to bucket data files.
           checkpointManager.save(state, stateEntry.getKey(), true /*skipWritingToWindowFile*/);
         }
       } catch (IOException e) {
@@ -369,6 +371,7 @@ public abstract class AbstractManagedStateImpl
     }
     if (!flashData.isEmpty()) {
       try {
+        // write incremental state to WAL (skipWrite=false) before the checkpoint
         checkpointManager.save(flashData, windowId, false);
       } catch (IOException e) {
         throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1ae14c03/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
index 65c1d1e..aa7cec7 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
@@ -111,6 +111,7 @@ public class IncrementalCheckpointManager extends FSWindowDataManager
           if (latestExpiredTimeBucket.get() > -1) {
             try {
               latestPurgedTimeBucket = latestExpiredTimeBucket.getAndSet(-1);
+              //LOG.debug("latestPurgedTimeBucket {}", latestPurgedTimeBucket);
               managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo(latestPurgedTimeBucket);
             } catch (IOException e) {
               throwable.set(e);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1ae14c03/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
index ece7686..cc8ea0a 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssigner.java
@@ -67,7 +67,7 @@ public class MovingBoundaryTimeBucketAssigner extends TimeBucketAssigner
   private int numBuckets;
   private transient long fixedStart;
   private transient boolean triggerPurge;
-  private transient long lowestPurgeableTimeBucket;
+  private transient long lowestPurgeableTimeBucket = -1;
 
 
   @Override
@@ -125,8 +125,9 @@ public class MovingBoundaryTimeBucketAssigner extends TimeBucketAssigner
       long move = (diffInBuckets + 1) * bucketSpanMillis;
       start += move;
       end += move;
-      triggerPurge = true;
       lowestPurgeableTimeBucket += diffInBuckets;
+      // trigger purge when lower bound changes
+      triggerPurge = (diffInBuckets > 0);
     }
     return key;
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1ae14c03/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
index 99e6c23..dab3925 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
@@ -177,7 +177,7 @@ public class ManagedStateImplTest
     testMeta.managedState.setup(testMeta.operatorContext);
 
     int numKeys = 300;
-    long lastWindowId = (long)numKeys;
+    long lastWindowId = numKeys;
 
     for (long windowId = 0L; windowId < lastWindowId; windowId++) {
       testMeta.managedState.beginWindow(windowId);
@@ -197,7 +197,7 @@ public class ManagedStateImplTest
     for (int key = numKeys - 1; key > 0; key--) {
       Slice keyVal = ManagedStateTestUtils.getSliceFor(Integer.toString(key));
       Slice val = testMeta.managedState.getSync(0L, keyVal);
-      Assert.assertNotNull(val);
+      Assert.assertNotNull("null value for key " + key, val);
     }
 
     testMeta.managedState.endWindow();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1ae14c03/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
index e4e5d2e..2b132f4 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MovingBoundaryTimeBucketAssignerTest.java
@@ -28,6 +28,8 @@ import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import org.apache.commons.lang3.mutable.MutableLong;
+
 import com.datatorrent.lib.util.KryoCloneUtils;
 
 public class MovingBoundaryTimeBucketAssignerTest
@@ -96,20 +98,43 @@ public class MovingBoundaryTimeBucketAssignerTest
   @Test
   public void testTimeBucketKeyExpiry()
   {
+    final MutableLong purgeLessThanEqualTo = new MutableLong(-2);
     testMeta.timeBucketAssigner.setExpireBefore(Duration.standardSeconds(1));
     testMeta.timeBucketAssigner.setBucketSpan(Duration.standardSeconds(1));
+    testMeta.timeBucketAssigner.setPurgeListener(new TimeBucketAssigner.PurgeListener()
+    {
+      @Override
+      public void purgeTimeBucketsLessThanEqualTo(long timeBucket)
+      {
+        purgeLessThanEqualTo.setValue(timeBucket);
+      }
+    });
 
     long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis();
     testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext);
+    Assert.assertEquals("purgeLessThanEqualTo", -2L, purgeLessThanEqualTo.longValue());
+
+    long time0 = Duration.standardSeconds(0).getMillis() + referenceTime;
+    Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucket(time0)
);
+    testMeta.timeBucketAssigner.endWindow();
+    Assert.assertEquals("purgeLessThanEqualTo", -2, purgeLessThanEqualTo.longValue());
 
     long time1 = Duration.standardSeconds(9).getMillis() + referenceTime;
     Assert.assertEquals("time bucket", 10, testMeta.timeBucketAssigner.getTimeBucket(time1)
);
+    testMeta.timeBucketAssigner.endWindow();
+    Assert.assertEquals("purgeLessThanEqualTo", 7, purgeLessThanEqualTo.longValue());
+    purgeLessThanEqualTo.setValue(-2);
 
     long time2 = Duration.standardSeconds(10).getMillis()  + referenceTime;
     Assert.assertEquals("time bucket", 11, testMeta.timeBucketAssigner.getTimeBucket(time2)
);
+    testMeta.timeBucketAssigner.endWindow();
+// TODO: why is purgeLessThanEqualTo not moving to 8 here?
+    Assert.assertEquals("purgeLessThanEqualTo", -2, purgeLessThanEqualTo.longValue());
 
     //Check for expiry of time1 now
     Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucket(time1)
);
+    testMeta.timeBucketAssigner.endWindow();
+    Assert.assertEquals("purgeLessThanEqualTo", -2, purgeLessThanEqualTo.longValue());
 
     testMeta.timeBucketAssigner.teardown();
   }


Mime
View raw message