apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From csi...@apache.org
Subject [1/3] incubator-apex-malhar git commit: APEXMALHAR-2065 added getWindowIds method to Window Data Manager
Date Tue, 17 May 2016 18:55:14 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master 1d3e20c69 -> 5d3b209fc


APEXMALHAR-2065 added getWindowIds method to Window Data Manager


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/2a6e1a6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/2a6e1a6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/2a6e1a6b

Branch: refs/heads/master
Commit: 2a6e1a6b50413f6e2c8c2ced55f4c10b562375f1
Parents: 4ef0700
Author: Timothy Farkas <tim@datatorrent.com>
Authored: Sun Apr 24 18:39:24 2016 -0700
Committer: Chandni Singh <csingh@apache.org>
Committed: Tue May 17 11:41:47 2016 -0700

----------------------------------------------------------------------
 .../apex/malhar/lib/wal/WindowDataManager.java  | 33 ++++++++++++++++++++
 .../malhar/lib/wal/FSWindowDataManagerTest.java | 19 +++++++++++
 2 files changed, 52 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2a6e1a6b/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java
index fd7948a..296238b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedSet;
 
 import javax.validation.constraints.NotNull;
 
@@ -99,6 +100,16 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
   void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds);
 
   /**
+   * Returns an array of windowIds for which data was stored by atleast one partition. The
array
+   * of winodwIds is sorted.
+   *
+   * @return An array of windowIds for which data was stored by atleast one partition. The
array
+   * of winodwIds is sorted.
+   * @throws IOException
+   */
+  long[] getWindowIds() throws IOException;
+
+  /**
    * An {@link WindowDataManager} that uses FS to persist state.
    */
   class FSWindowDataManager implements WindowDataManager
@@ -227,6 +238,22 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
       return storageAgent.getWindowIds(operatorId);
     }
 
+    @Override
+    public long[] getWindowIds() throws IOException
+    {
+      SortedSet<Long> windowIds = replayState.keySet();
+      long[] windowIdsArray = new long[windowIds.size()];
+
+      int index = 0;
+
+      for (Long windowId: windowIds) {
+        windowIdsArray[index] = windowId;
+        index++;
+      }
+
+      return windowIdsArray;
+    }
+
     /**
      * This deletes all the recovery files of window ids <= windowId.
      *
@@ -436,5 +463,11 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
     {
       return new long[0];
     }
+
+    @Override
+    public long[] getWindowIds() throws IOException
+    {
+      return new long[0];
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2a6e1a6b/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java
b/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java
index dff061a..7f3adc9 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java
@@ -158,6 +158,25 @@ public class FSWindowDataManagerTest
   }
 
   @Test
+  public void testGetWindowIds() throws IOException
+  {
+    testMeta.storageManager.setup(testMeta.context);
+    Map<Integer, String> data = Maps.newHashMap();
+    data.put(1, "one");
+    data.put(2, "two");
+    data.put(3, "three");
+
+    testMeta.storageManager.save(data, 1, 1);
+    testMeta.storageManager.save(data, 2, 2);
+
+    testMeta.storageManager.setup(testMeta.context);
+
+    Assert.assertArrayEquals(new long[] {1, 2}, testMeta.storageManager.getWindowIds());
+
+    testMeta.storageManager.teardown();
+  }
+
+  @Test
   public void testDelete() throws IOException
   {
     testMeta.storageManager.setup(testMeta.context);


Mime
View raw message