flume-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Attila Simon (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLUME-3032) taildir source sleeps frequently.
Date Wed, 21 Dec 2016 16:42:58 GMT

    [ https://issues.apache.org/jira/browse/FLUME-3032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15767508#comment-15767508
] 

Attila Simon commented on FLUME-3032:
-------------------------------------

I would refactor {{tailFileProcess()}} to a separate class which would get its dependencies
via constructor and define this function as part of its public API. That class could be tested
as a unit by mocking its constructor params and see whether all the events was passed to the
channel just by invoking the {{tailFileProcess()}} once. Mock which will be needed: {{getChannelProcessor().processEventBatch()}}
should also modify the number of {{events}} beside count how many arrived (to imitate an interceptor
which dropped one and also to verify that all events arrived). I think mocking the {{reader}}
to produce {{events}} simply would be a convenient choice over setting up temporal files.
I know this would be a relatively bigger change compared to your existing patch but would
make an improvement on code quality. If you think you need help with it I'm happy to answer
your questions or provide more details. 

Also another option would be to test this functionality via the public {{process()}} function
by mocking all the relevant objects as written above plus the ones required by {{process()}}
itself to work. 

There might be other options I guess every idea is welcomed.


Just double checked and you missed a space before '=' here: {{long receivedSize= 0;}}
and missed a semicolon here: {{receivedSize = events.size()}}

> taildir source sleeps frequently.
> ---------------------------------
>
>                 Key: FLUME-3032
>                 URL: https://issues.apache.org/jira/browse/FLUME-3032
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.7.0
>         Environment: CentOS Linux release 7.2.1511 (Core) 
> java version "1.7.0_80"
>            Reporter: Liu Tianhao
>              Labels: newbie
>         Attachments: FLUME-3032.patch
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Test configuration.
> source - taildir
> interceptor -  The custom interceptor drops some events
> channel - anyone
> sink - none
> I found that taildir source sleeps frequently.
> The tailFileProcess() function in TaildirSource.java break the loop by (events.size()
< batchSize), but interceptor may change events.size().
> I think the events.size() should be used before interceptor processing. 
> Avoid unnecessary sleep.
> {code:title=TaildirSource.java|borderStyle=solid}
>     private void tailFileProcess(TailFile tf, boolean backoffWithoutNL)
>             throws IOException, InterruptedException {
>         long receivedSize = 0;
>         while (true) {
>             reader.setCurrentFile(tf);
>             List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
>             if (events.isEmpty()) {
>                 break;
>             }
>             receivedSize = events.size();
>             sourceCounter.addToEventReceivedCount(receivedSize);
>             sourceCounter.incrementAppendBatchReceivedCount();
>             try {
>                 getChannelProcessor().processEventBatch(events);
>                 reader.commit();
>             } catch (ChannelException ex) {
>                 logger.warn("The channel is full or unexpected failure. "
>                         + "The source will try again after " + retryInterval
>                         + " ms");
>                 TimeUnit.MILLISECONDS.sleep(retryInterval);
>                 retryInterval = retryInterval << 1;
>                 retryInterval = Math.min(retryInterval, maxRetryInterval);
>                 continue;
>             }
>             retryInterval = 1000;
>             sourceCounter.addToEventAcceptedCount(events.size());
>             sourceCounter.incrementAppendBatchAcceptedCount();
>             if (receivedSize < batchSize) {
>                 break;
>             }
>         }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message