flume-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Lior Zeno (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (FLUME-2811) Taildir source doesn't call stop() on graceful shutdown
Date Sat, 23 Jul 2016 08:45:20 GMT

     [ https://issues.apache.org/jira/browse/FLUME-2811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Lior Zeno updated FLUME-2811:
-----------------------------
    Fix Version/s: v1.7.0

I'm scheduling this to 1.7.0, since it's the first release to include this source. Still,
it requires further investigation. 

> Taildir source doesn't call stop() on graceful shutdown
> -------------------------------------------------------
>
>                 Key: FLUME-2811
>                 URL: https://issues.apache.org/jira/browse/FLUME-2811
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.7.0
>            Reporter: Jun Seok Hong
>            Priority: Critical
>              Labels: newbie
>             Fix For: v1.7.0
>
>
> Taildir source doesn't call stop() on graceful shutdown.
> Test configuration.
> source - taildir
> channel - PseudoTxnMemoryChannel / flume-kafka-channel
> sink - none
> I found that flume sometimes doesn't terminate with Taildir source. 
> I had to kill the process to terminate it.
> tailFileProcess() function in TaildirSource.java has a infinite loop.
> When the process interrupted, ChannelException will happen, but it can't breaks the infinite
loop.
> I think that's the reason why Taildir can't call stop() function.
> {code:title=TaildirSource.java|borderStyle=solid}
>  private void tailFileProcess(TailFile tf, boolean backoffWithoutNL)
>       throws IOException, InterruptedException {
>     while (true) {
>       reader.setCurrentFile(tf);
>       List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
>       if (events.isEmpty()) {
>         break;
>       }
>       sourceCounter.addToEventReceivedCount(events.size());
>       sourceCounter.incrementAppendBatchReceivedCount();
>       try {
>         getChannelProcessor().processEventBatch(events);
>         reader.commit();
>       } catch (ChannelException ex) {
>         logger.warn("The channel is full or unexpected failure. " +
>           "The source will try again after " + retryInterval + " ms");
>         TimeUnit.MILLISECONDS.sleep(retryInterval);
>         retryInterval = retryInterval << 1;
>         retryInterval = Math.min(retryInterval, maxRetryInterval);
>         continue;
>       }
>       retryInterval = 1000;
>       sourceCounter.addToEventAcceptedCount(events.size());
>       sourceCounter.incrementAppendBatchAcceptedCount();
>       if (events.size() < batchSize) {
>         break;
>       }
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message