flume-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF subversion and git services (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLUME-2318) SpoolingDirectory is unable to handle empty files
Date Wed, 17 Aug 2016 17:26:20 GMT

    [ https://issues.apache.org/jira/browse/FLUME-2318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15424970#comment-15424970
] 

ASF subversion and git services commented on FLUME-2318:
--------------------------------------------------------

Commit a9a775a9c8324e0724ed4720ae4f383896ea8c96 in flume's branch refs/heads/trunk from [~bessbd]
[ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=a9a775a ]

FLUME-2318: Make SpoolingDirectorySource able to handle empty files

(Bessenyei Balázs Donát via Mike Percy)


> SpoolingDirectory is unable to handle empty files
> -------------------------------------------------
>
>                 Key: FLUME-2318
>                 URL: https://issues.apache.org/jira/browse/FLUME-2318
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.4.0
>            Reporter: Muhammad Ehsan ul Haque
>            Assignee: Bessenyei Balázs Donát
>            Priority: Minor
>              Labels: easytest, patch
>             Fix For: v1.7.0
>
>         Attachments: FLUME-2318-0.patch, FLUME-2318-1.patch, FLUME-2318-2.patch
>
>
> Empty files should be returned as an empty event instead of no event.
> h4. Scenario
> From the start consume files in this order
> # f1: File with data or empty file
> # f2: Empty File
> # No file in spooling directory
> h4. Expected Outcome
> # channel.take() should return event with f1 data.
> # channel.take() should return event with f2 data (empty data).
> # channel.take() should return null.
> h4. What happens
> # channel.take() returns event with f1 data.
> # channel.take() returns null.
> # Exception is raised when the SpoolDirectorySource thread tries to read events from
the ReliableSpoolingFileEventReader. Snippet of trace is
> 2014-02-09 15:46:35,832 (pool-1-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:346)]
Preparing to move file /tmp/1391957195572-0/file1 to /tmp/1391957195572-0/file1.COMPLETED
> 2014-02-09 15:46:36,334 (pool-1-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:228)]
Last read was never committed - resetting mark position.
> 2014-02-09 15:46:36,335 (pool-1-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:346)]
Preparing to move file /tmp/1391957195572-0/file2 to /tmp/1391957195572-0/file2.COMPLETED
> 2014-02-09 15:46:36,839 (pool-1-thread-1) [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:252)]
FATAL: Spool Directory source null: { spoolDir: /tmp/1391957195572-0 }: Uncaught exception
in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
> java.lang.IllegalStateException: File should not roll when commit is outstanding.
> 	at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:225)
> 	at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:224)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> 	at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
> 	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:722)
> h4. Unit Test
> In TestSpoolDirectorySource
> {code}
>   @Test
>   public void testWithEmptyFile2()
>       throws InterruptedException, IOException {
>     Context context = new Context();
>     File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
>     Files.write("some data".getBytes(), f1);
>     File f2 = new File(tmpDir.getAbsolutePath() + "/file2");
>     Files.write(new byte[0], f2);
>     context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
>         tmpDir.getAbsolutePath());
>     Configurables.configure(source, context);
>     source.start();
>     Thread.sleep(10);
>     for (int i=0; i<2; i++) {
>       Transaction txn = channel.getTransaction();
>       txn.begin();
>       Event e = channel.take();
>       txn.commit();
>       txn.close();
>     }
>     Transaction txn = channel.getTransaction();
>     txn.begin();
>     Assert.assertNull(channel.take());
>     txn.commit();
>     txn.close();
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message