flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Peter Turcsanyi <turcsa...@cloudera.com>
Subject Re: flume transaction close to begin cost 3-5 seconds
Date Fri, 25 May 2018 10:58:28 GMT
Hi,

Do you mean this 5 second long gap:
2018-05-22 00:03:49,533 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[INFO -
org.apache.flume.sink.sequoiadb.SequoiaDBOneSink.process(SequoiaDBOneSink.java:341)]
transaction close timestamp is :1526918629533
2018-05-22 00:03:54,538 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[INFO -
org.apache.flume.sink.sequoiadb.SequoiaDBOneSink.process(SequoiaDBOneSink.java:310)]
transaction begin timestamp is :1526918634538
?

This is the normal way how the polling thread works. The polling thread
calls the process() method on the sink periodically. If there were no
events to be processed, the sink implementations should return
Status.BACKOFF (as your sink does). In this case the polling thread does
not call the sink again immediately, but sleeps for a while. The sleeping
periods are: 1, 2, 3, 4, 5, 5, 5, ... seconds. (if the consecutive sink
calls returned backoff again).

Based on the log ("txnEventCount is :0"), the sink did not processed any
events, so the waiting periods seem normal.

For further details please see the PollingRunner class in SinkRunner:
https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java

Regards,
Peter Turcsanyi

On Mon, May 21, 2018 at 5:43 PM, 基勇 <252637867@qq.com> wrote:

> Hi,guys:
>      I use flume to read data written to sequoiadb, channel uses kafka
> channel, sink is developed to flume sequoiadb sink itself, testing the time
> to write sequoiadb, but transation close to begin even takes 3-5 seconds.
>      Why does the transaction take so long to close to open?
>      Is there a partner who can help me?  3Q
>
> code:
>
> @Override
>
> public Status process() throws EventDeliveryException {
>
> // TODO Auto-generated method stub
>
> Channel channel = getChannel();
>
>     Transaction transaction = channel.getTransaction();
>
>     LOG.info("transaction begin timestamp is :"
> +System.currentTimeMillis());
>
>     transaction.begin();
>
>     boolean success = false;
>
>
>
>         try {
>
>         int txnEventCount = drainOne(channel);
>
> transaction.commit();
>
>         success = true;
>
>         LOG.info("transaction commit timestamp is :"
> +System.currentTimeMillis());
>
>         if (txnEventCount < 1) {
>
>           return Status.BACKOFF;
>
>         } else {
>
>           return Status.READY;
>
>         }
>
>
>
> } catch (BaseException e) {
>
> LOG.error(e.getMessage(),e);
>
> return Status.BACKOFF;
>
> } catch (InterruptedException e) {
>
> LOG.error(e.getMessage(),e);
>
> return Status.BACKOFF;
>
> } catch (Exception e){
>
> throw new EventDeliveryException(e);
>
> }finally{
>
> if (!success) {
>
>         transaction.rollback();
>
>       }
>
>       transaction.close();
>
>       LOG.info("transaction close timestamp is :"
> +System.currentTimeMillis());
>
> }
>
> }
>
>

Mime
View raw message