Return-Path: X-Original-To: apmail-apex-commits-archive@minotaur.apache.org Delivered-To: apmail-apex-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 111BF17B7B for ; Wed, 9 Sep 2015 15:10:29 +0000 (UTC) Received: (qmail 91215 invoked by uid 500); 9 Sep 2015 15:10:29 -0000 Delivered-To: apmail-apex-commits-archive@apex.apache.org Received: (qmail 91182 invoked by uid 500); 9 Sep 2015 15:10:28 -0000 Mailing-List: contact commits-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list commits@apex.incubator.apache.org Received: (qmail 91173 invoked by uid 99); 9 Sep 2015 15:10:28 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Sep 2015 15:10:28 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 7DF79C019D for ; Wed, 9 Sep 2015 15:10:28 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.774 X-Spam-Level: * X-Spam-Status: No, score=1.774 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.006] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id FrfaCL_312aP for ; Wed, 9 Sep 2015 15:10:27 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 1BC6020FAF for ; Wed, 9 Sep 2015 15:10:25 +0000 (UTC) Received: (qmail 91097 invoked by uid 99); 9 Sep 2015 15:10:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Sep 2015 15:10:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1C402DFB87; Wed, 9 Sep 2015 15:10:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pramod@apache.org To: commits@apex.incubator.apache.org Date: Wed, 09 Sep 2015 15:10:25 -0000 Message-Id: <4a1d7e3803d0450fae77a8d0fb487f1c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-apex-malhar git commit: MLHR-1841 #comment rotating all files irrespective of the state of stream Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 d910102d7 -> 59f21fb5d MLHR-1841 #comment rotating all files irrespective of the state of stream Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b8a10bc5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b8a10bc5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b8a10bc5 Branch: refs/heads/devel-3 Commit: b8a10bc5381bd8a004a2af4f5c0812e379c82d44 Parents: d910102 Author: Chandni Singh Authored: Tue Sep 8 14:21:04 2015 -0700 Committer: Chandni Singh Committed: Tue Sep 8 14:52:34 2015 -0700 ---------------------------------------------------------------------- .../lib/io/fs/AbstractFileOutputOperator.java | 4 +-- .../io/fs/AbstractFileOutputOperatorTest.java | 38 ++++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b8a10bc5/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java index aef0739..8339cc1 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java @@ -868,9 +868,9 @@ public abstract class AbstractFileOutputOperator extends BaseOperator imp if (++rotationCount == rotationWindows) { rotationCount = 0; // Rotate the files - Iterator iterator = streamsCache.asMap().keySet().iterator(); + Iterator> iterator = openPart.entrySet().iterator(); while (iterator.hasNext()) { - String filename = iterator.next(); + String filename = iterator.next().getKey(); // Rotate the file if the following conditions are met // 1. The file is not already rotated during this period for other reasons such as max length is reached // or rotate was explicitly called externally http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b8a10bc5/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java index b900af2..dd6bb1d 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java @@ -1809,6 +1809,44 @@ public class AbstractFileOutputOperatorTest } @Test + public void testPeriodicRotationWithEviction() throws InterruptedException + { + EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter(); + File dir = new File(testMeta.getDir()); + writer.setFilePath(testMeta.getDir()); + writer.setRotationWindows(30); + writer.setAlwaysWriteToTmp(true); + writer.setExpireStreamAfterAccessMillis(1L); + writer.setup(testMeta.testOperatorContext); + + // Check that rotation for even.txt.0 happens + for (int i = 0; i < 30; ++i) { + writer.beginWindow(i); + if (i == 0) { + writer.input.put(i); + } + Thread.sleep(100L); + writer.endWindow(); + } + writer.committed(29); + Set fileNames = new TreeSet<>(); + fileNames.add(EVEN_FILE + ".0"); + Collection files = FileUtils.listFiles(dir, null, false); + Assert.assertEquals("Number of part files", 1, files.size()); + Assert.assertEquals("Part file names", fileNames, getFileNames(files)); + + // Check that rotation doesn't happen for files that don't have data during the rotation period + for (int i = 30; i < 120; ++i) { + writer.beginWindow(i); + writer.endWindow(); + } + writer.committed(119); + files = FileUtils.listFiles(dir, null, false); + Assert.assertEquals("Number of part files", 1, files.size()); + Assert.assertEquals("Part file names", fileNames, getFileNames(files)); + } + + @Test public void testCompression() throws IOException { EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();