flume-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Liu Tianhao (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLUME-3032) taildir source sleeps frequently.
Date Sun, 18 Dec 2016 12:41:58 GMT

    [ https://issues.apache.org/jira/browse/FLUME-3032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15758779#comment-15758779
] 

Liu Tianhao commented on FLUME-3032:
------------------------------------

Sorry, I do not know how to design a junit test to cover this change. This is just to avoid
unnecessary sleep, can you give me a little help? Thank you very much!!!

> taildir source sleeps frequently.
> ---------------------------------
>
>                 Key: FLUME-3032
>                 URL: https://issues.apache.org/jira/browse/FLUME-3032
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.7.0
>         Environment: CentOS Linux release 7.2.1511 (Core) 
> java version "1.7.0_80"
>            Reporter: Liu Tianhao
>              Labels: newbie
>         Attachments: FLUME-3032.patch
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Test configuration.
> source - taildir
> interceptor -  The custom interceptor drops some events
> channel - anyone
> sink - none
> I found that taildir source sleeps frequently.
> The tailFileProcess() function in TaildirSource.java break the loop by (events.size()
< batchSize), but interceptor may change events.size().
> I think the events.size() should be used before interceptor processing. 
> Avoid unnecessary sleep.
> {code:title=TaildirSource.java|borderStyle=solid}
>     private void tailFileProcess(TailFile tf, boolean backoffWithoutNL)
>             throws IOException, InterruptedException {
>         long receivedSize = 0;
>         while (true) {
>             reader.setCurrentFile(tf);
>             List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
>             if (events.isEmpty()) {
>                 break;
>             }
>             receivedSize = events.size();
>             sourceCounter.addToEventReceivedCount(receivedSize);
>             sourceCounter.incrementAppendBatchReceivedCount();
>             try {
>                 getChannelProcessor().processEventBatch(events);
>                 reader.commit();
>             } catch (ChannelException ex) {
>                 logger.warn("The channel is full or unexpected failure. "
>                         + "The source will try again after " + retryInterval
>                         + " ms");
>                 TimeUnit.MILLISECONDS.sleep(retryInterval);
>                 retryInterval = retryInterval << 1;
>                 retryInterval = Math.min(retryInterval, maxRetryInterval);
>                 continue;
>             }
>             retryInterval = 1000;
>             sourceCounter.addToEventAcceptedCount(events.size());
>             sourceCounter.incrementAppendBatchAcceptedCount();
>             if (receivedSize < batchSize) {
>                 break;
>             }
>         }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message