apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pra...@apache.org
Subject [1/2] incubator-apex-malhar git commit: MLHR-1841 #comment rotating all files irrespective of the state of stream
Date Wed, 09 Sep 2015 15:10:25 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 d910102d7 -> 59f21fb5d


MLHR-1841 #comment rotating all files irrespective of the state of stream


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b8a10bc5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b8a10bc5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b8a10bc5

Branch: refs/heads/devel-3
Commit: b8a10bc5381bd8a004a2af4f5c0812e379c82d44
Parents: d910102
Author: Chandni Singh <csingh@apache.org>
Authored: Tue Sep 8 14:21:04 2015 -0700
Committer: Chandni Singh <csingh@apache.org>
Committed: Tue Sep 8 14:52:34 2015 -0700

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileOutputOperator.java   |  4 +--
 .../io/fs/AbstractFileOutputOperatorTest.java   | 38 ++++++++++++++++++++
 2 files changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b8a10bc5/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index aef0739..8339cc1 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -868,9 +868,9 @@ public abstract class AbstractFileOutputOperator<INPUT> extends
BaseOperator imp
       if (++rotationCount == rotationWindows) {
         rotationCount = 0;
         // Rotate the files
-        Iterator<String> iterator = streamsCache.asMap().keySet().iterator();
+        Iterator<Map.Entry<String, MutableInt>> iterator = openPart.entrySet().iterator();
         while (iterator.hasNext()) {
-          String filename = iterator.next();
+          String filename = iterator.next().getKey();
           // Rotate the file if the following conditions are met
           // 1. The file is not already rotated during this period for other reasons such
as max length is reached
           //     or rotate was explicitly called externally

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b8a10bc5/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
index b900af2..dd6bb1d 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
@@ -1809,6 +1809,44 @@ public class AbstractFileOutputOperatorTest
   }
 
   @Test
+  public void testPeriodicRotationWithEviction() throws InterruptedException
+  {
+    EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
+    File dir = new File(testMeta.getDir());
+    writer.setFilePath(testMeta.getDir());
+    writer.setRotationWindows(30);
+    writer.setAlwaysWriteToTmp(true);
+    writer.setExpireStreamAfterAccessMillis(1L);
+    writer.setup(testMeta.testOperatorContext);
+
+    // Check that rotation for even.txt.0 happens
+    for (int i = 0; i < 30; ++i) {
+      writer.beginWindow(i);
+      if (i == 0) {
+        writer.input.put(i);
+      }
+      Thread.sleep(100L);
+      writer.endWindow();
+    }
+    writer.committed(29);
+    Set<String> fileNames = new TreeSet<>();
+    fileNames.add(EVEN_FILE + ".0");
+    Collection<File> files = FileUtils.listFiles(dir, null, false);
+    Assert.assertEquals("Number of part files", 1, files.size());
+    Assert.assertEquals("Part file names", fileNames, getFileNames(files));
+
+    // Check that rotation doesn't happen for files that don't have data during the rotation
period
+    for (int i = 30; i < 120; ++i) {
+      writer.beginWindow(i);
+      writer.endWindow();
+    }
+    writer.committed(119);
+    files = FileUtils.listFiles(dir, null, false);
+    Assert.assertEquals("Number of part files", 1, files.size());
+    Assert.assertEquals("Part file names", fileNames, getFileNames(files));
+  }
+
+  @Test
   public void testCompression() throws IOException
   {
     EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();


Mime
View raw message