Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B108611C8D for ; Fri, 28 Mar 2014 22:56:35 +0000 (UTC) Received: (qmail 99031 invoked by uid 500); 28 Mar 2014 22:56:33 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 98892 invoked by uid 500); 28 Mar 2014 22:56:32 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 98872 invoked by uid 99); 28 Mar 2014 22:56:31 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Mar 2014 22:56:31 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 97468915CE3; Fri, 28 Mar 2014 22:56:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hshreedharan@apache.org To: commits@flume.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: FLUME-2350. Consume Order tests need to space out file creation. Date: Fri, 28 Mar 2014 22:56:31 +0000 (UTC) Repository: flume Updated Branches: refs/heads/trunk 61b9bcbb6 -> 62b383a00 FLUME-2350. Consume Order tests need to space out file creation. (Muhammad Ehsan ul Haque via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/62b383a0 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/62b383a0 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/62b383a0 Branch: refs/heads/trunk Commit: 62b383a00c3f678b0f504dc71bf36091ddd4067a Parents: 61b9bcb Author: Hari Shreedharan Authored: Fri Mar 28 15:55:25 2014 -0700 Committer: Hari Shreedharan Committed: Fri Mar 28 15:56:25 2014 -0700 ---------------------------------------------------------------------- .../avro/ReliableSpoolingFileEventReader.java | 25 +++- .../TestReliableSpoolingFileEventReader.java | 132 +++++++++++++++---- 2 files changed, 123 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/62b383a0/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java index 1818250..0bc3f23 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java +++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import com.google.common.io.Files; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.IOFileFilter; +import org.apache.commons.lang.StringUtils; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; @@ -429,21 +430,27 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { if (candidateFiles.isEmpty()) { // No matching file in spooling directory. return Optional.absent(); } - + File selectedFile = candidateFiles.get(0); // Select the first random file. if (consumeOrder == ConsumeOrder.RANDOM) { // Selected file is random. return openFile(selectedFile); } else if (consumeOrder == ConsumeOrder.YOUNGEST) { for (File candidateFile: candidateFiles) { - if (candidateFile.lastModified() > - selectedFile.lastModified()) { + long compare = selectedFile.lastModified() - + candidateFile.lastModified(); + if (compare == 0) { // ts is same pick smallest lexicographically. + selectedFile = smallerLexicographical(selectedFile, candidateFile); + } else if (compare < 0) { // candidate is younger (cand-ts > selec-ts) selectedFile = candidateFile; } } } else { // default order is OLDEST for (File candidateFile: candidateFiles) { - if (candidateFile.lastModified() < - selectedFile.lastModified()) { + long compare = selectedFile.lastModified() - + candidateFile.lastModified(); + if (compare == 0) { // ts is same pick smallest lexicographically. + selectedFile = smallerLexicographical(selectedFile, candidateFile); + } else if (compare > 0) { // candidate is older (cand-ts < selec-ts). selectedFile = candidateFile; } } @@ -451,7 +458,13 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { return openFile(selectedFile); } - + + private File smallerLexicographical(File f1, File f2) { + if (f1.getName().compareTo(f2.getName()) < 0) { + return f1; + } + return f2; + } /** * Opens a file for consuming * @param file http://git-wip-us.apache.org/repos/asf/flume/blob/62b383a0/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java index 0b07e7a..6a02612 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java +++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java @@ -203,36 +203,42 @@ public class TestReliableSpoolingFileEventReader { @Test public void testConsumeFileRandomly() throws IOException { - ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder() + ReliableEventReader reader + = new ReliableSpoolingFileEventReader.Builder() .spoolDirectory(WORK_DIR) .consumeOrder(ConsumeOrder.RANDOM) .build(); File fileName = new File(WORK_DIR, "new-file"); - FileUtils.write(fileName, "New file created in the end. Shoud be read randomly.\n"); - Set actual = Sets.newHashSet(); + FileUtils.write(fileName, + "New file created in the end. Shoud be read randomly.\n"); + Set actual = Sets.newHashSet(); readEventsForFilesInDir(WORK_DIR, reader, actual); Set expected = Sets.newHashSet(); createExpectedFromFilesInSetup(expected); expected.add(""); - expected.add("New file created in the end. Shoud be read randomly."); + expected.add( + "New file created in the end. Shoud be read randomly."); Assert.assertEquals(expected, actual); } @Test public void testConsumeFileOldest() throws IOException, InterruptedException { - ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder() - .spoolDirectory(WORK_DIR) - .consumeOrder(ConsumeOrder.OLDEST) - .build(); + ReliableEventReader reader + = new ReliableSpoolingFileEventReader.Builder() + .spoolDirectory(WORK_DIR) + .consumeOrder(ConsumeOrder.OLDEST) + .build(); File file1 = new File(WORK_DIR, "new-file1"); File file2 = new File(WORK_DIR, "new-file2"); File file3 = new File(WORK_DIR, "new-file3"); - FileUtils.write(file2, "New file2 created.\n"); // file2 becoming older than file1 & file3 Thread.sleep(1000L); - FileUtils.write(file1, "New file1 created.\n"); // file1 becoming older than file3 + FileUtils.write(file2, "New file2 created.\n"); + Thread.sleep(1000L); + FileUtils.write(file1, "New file1 created.\n"); + Thread.sleep(1000L); FileUtils.write(file3, "New file3 created.\n"); - + // order of age oldest to youngest (file2, file1, file3) List actual = Lists.newLinkedList(); readEventsForFilesInDir(WORK_DIR, reader, actual); List expected = Lists.newLinkedList(); @@ -245,25 +251,30 @@ public class TestReliableSpoolingFileEventReader { } @Test - public void testConsumeFileYoungest() throws IOException, InterruptedException { - ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder() - .spoolDirectory(WORK_DIR) - .consumeOrder(ConsumeOrder.YOUNGEST) - .build(); - Thread.sleep(1000L); - File file1 = new File(WORK_DIR, "new-file1"); - File file2 = new File(WORK_DIR, "new-file2"); + public void testConsumeFileYoungest() + throws IOException, InterruptedException { + ReliableEventReader reader + = new ReliableSpoolingFileEventReader.Builder() + .spoolDirectory(WORK_DIR) + .consumeOrder(ConsumeOrder.YOUNGEST) + .build(); + File file1 = new File(WORK_DIR, "new-file1"); + File file2 = new File(WORK_DIR, "new-file2"); File file3 = new File(WORK_DIR, "new-file3"); - FileUtils.write(file2, "New file2 created.\n"); // file2 is oldest among file1 & file3. - Thread.sleep(1000L); - FileUtils.write(file3, "New file3 created.\n"); // file3 becomes youngest then file2 but older from file1. - FileUtils.write(file1, "New file1 created.\n"); // file1 becomes youngest in file2 & file3. + Thread.sleep(1000L); + FileUtils.write(file2, "New file2 created.\n"); + Thread.sleep(1000L); + FileUtils.write(file3, "New file3 created.\n"); + Thread.sleep(1000L); + FileUtils.write(file1, "New file1 created.\n"); + // order of age youngest to oldest (file2, file3, file1) List actual = Lists.newLinkedList(); readEventsForFilesInDir(WORK_DIR, reader, actual); List expected = Lists.newLinkedList(); createExpectedFromFilesInSetup(expected); Collections.sort(expected); - expected.add(0, ""); // Empty Line file was added in the last in Setup. + // Empty Line file was added in the last in Setup. + expected.add(0, ""); expected.add(0, "New file2 created."); expected.add(0, "New file3 created."); expected.add(0, "New file1 created."); @@ -271,6 +282,66 @@ public class TestReliableSpoolingFileEventReader { Assert.assertEquals(expected, actual); } + @Test + public void testConsumeFileOldestWithLexicographicalComparision() + throws IOException, InterruptedException { + ReliableEventReader reader + = new ReliableSpoolingFileEventReader.Builder() + .spoolDirectory(WORK_DIR) + .consumeOrder(ConsumeOrder.OLDEST) + .build(); + File file1 = new File(WORK_DIR, "new-file1"); + File file2 = new File(WORK_DIR, "new-file2"); + File file3 = new File(WORK_DIR, "new-file3"); + Thread.sleep(1000L); + FileUtils.write(file3, "New file3 created.\n"); + FileUtils.write(file2, "New file2 created.\n"); + FileUtils.write(file1, "New file1 created.\n"); + file1.setLastModified(file3.lastModified()); + file1.setLastModified(file2.lastModified()); + // file ages are same now they need to be ordered + // lexicographically (file1, file2, file3). + List actual = Lists.newLinkedList(); + readEventsForFilesInDir(WORK_DIR, reader, actual); + List expected = Lists.newLinkedList(); + createExpectedFromFilesInSetup(expected); + expected.add(""); // Empty file was added in the last in setup. + expected.add("New file1 created."); + expected.add("New file2 created."); + expected.add("New file3 created."); + Assert.assertEquals(expected, actual); + } + + @Test + public void testConsumeFileYoungestWithLexicographicalComparision() + throws IOException, InterruptedException { + ReliableEventReader reader + = new ReliableSpoolingFileEventReader.Builder() + .spoolDirectory(WORK_DIR) + .consumeOrder(ConsumeOrder.YOUNGEST) + .build(); + File file1 = new File(WORK_DIR, "new-file1"); + File file2 = new File(WORK_DIR, "new-file2"); + File file3 = new File(WORK_DIR, "new-file3"); + Thread.sleep(1000L); + FileUtils.write(file1, "New file1 created.\n"); + FileUtils.write(file2, "New file2 created.\n"); + FileUtils.write(file3, "New file3 created.\n"); + file1.setLastModified(file3.lastModified()); + file1.setLastModified(file2.lastModified()); + // file ages are same now they need to be ordered + // lexicographically (file1, file2, file3). + List actual = Lists.newLinkedList(); + readEventsForFilesInDir(WORK_DIR, reader, actual); + List expected = Lists.newLinkedList(); + createExpectedFromFilesInSetup(expected); + expected.add(0, ""); // Empty file was added in the last in setup. + expected.add(0, "New file3 created."); + expected.add(0, "New file2 created."); + expected.add(0, "New file1 created."); + Assert.assertEquals(expected, actual); + } + @Test public void testLargeNumberOfFilesOLDEST() throws IOException { templateTestForLargeNumberOfFiles(ConsumeOrder.OLDEST, null, 1000); } @@ -291,9 +362,12 @@ public class TestReliableSpoolingFileEventReader { int N) throws IOException { File dir = null; try { - dir = new File("target/test/work/" + this.getClass().getSimpleName()+ "_large"); + dir = new File( + "target/test/work/" + this.getClass().getSimpleName() + + "_large"); Files.createParentDirs(new File(dir, "dummy")); - ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder() + ReliableEventReader reader + = new ReliableSpoolingFileEventReader.Builder() .spoolDirectory(dir).consumeOrder(order).build(); Map> expected; if (comparator == null) { @@ -328,8 +402,10 @@ public class TestReliableSpoolingFileEventReader { if (order == ConsumeOrder.RANDOM) { Assert.assertTrue(expectedList.remove(new String(e.getBody()))); } else { - Assert.assertEquals(((ArrayList)expectedList).get(0), new String(e.getBody())); - ((ArrayList)expectedList).remove(0); + Assert.assertEquals( + ((ArrayList) expectedList).get(0), + new String(e.getBody())); + ((ArrayList) expectedList).remove(0); } } reader.commit();