flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeff Lord <jl...@cloudera.com>
Subject Re: Flume Embedded Agent Interrupted in Handshake
Date Fri, 04 Jul 2014 18:35:08 GMT
Adam,

You are mostly correct. The one thing I might add that may help is to know
that the sink is consuming the events from the channel, writing them to the
next hop source and then committing the transaction. As opposed to the
channel pushing the events, as the channel is a passive component. You
should be able to leave out the this.agent.put(null); and just sleep to
allow the sink enough time to write the events in the channel to the next
hop and commit. As long as the agent is running the sink will try to pull
events off the channel. If there are no events left it will simply backoff
and try again.

Another option would be to poll the ChannelSize metric to know when the
channel is actually empty and then make the call to gracefully stop the
agent. I know there is one open jira that relates to this here.

https://issues.apache.org/jira/browse/FLUME-1318

It might be worth opening another to improve the graceful shutdown of the
embedded agent.

Best,

Jeff


On Fri, Jul 4, 2014 at 9:08 AM, Adam Higginson <Adam.Higginson@bjss.com>
wrote:

>  Hi,
>
>
>
> I’m currently attempting to make use of an Embedded Agent with a file
> based channel which will be used to write to another agent and then
> ultimately into a hdfs sequence file. The connection is successfully made,
> and data is sent across to the other agent (and then a sequence file).
> However, I’m trying to understand how to close the agent and flush out any
> ‘local’ data that still may need to be transferred on program close.
>
>
>
> As I am using an AvroSink, I read that if a channel sends a null event the
> batch is immediately sent (read from
> https://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/AvroSink.html).
> My idea, then, was to pass null to the agent just as the program is
> requested to stop, like so:
>
>
>
> *this*.agent.put(*null*);
>
> *this*.agent.stop();
>
>
>
> This seemed to attempt to send the batch, however when the agent.stop()
> method is called, an error is printed in the logs:
>
>
>
> *org.apache.flume.EventDeliveryException*: Failed to send events
>
>        at org.apache.flume.sink.AbstractRpcSink.process(
> *AbstractRpcSink.java:382*) ~[flume-ng-core-1.4.0-cdh4.4.0.jar:na]
>
>        at org.apache.flume.sink.FailoverSinkProcessor.process(
> *FailoverSinkProcessor.java:182*) ~[flume-ng-core-1.4.0-cdh4.4.0.jar:na]
>
>        at org.apache.flume.SinkRunner$PollingRunner.run(
> *SinkRunner.java:147*) [flume-ng-core-1.4.0-cdh4.4.0.jar:na]
>
>        at java.lang.Thread.run(*Thread.java:744*) [na:1.7.0_45]
>
> Caused by: *org.apache.flume.EventDeliveryException*: NettyAvroRpcClient
> { host: oraclelinux6.localdomain, port: 44444 }: Failed to send batch
>
>        at org.apache.flume.api.NettyAvroRpcClient.appendBatch(
> *NettyAvroRpcClient.java:294*) ~[flume-ng-sdk-1.4.0-cdh4.4.0.jar:na]
>
>        at org.apache.flume.sink.AbstractRpcSink.process(
> *AbstractRpcSink.java:366*) ~[flume-ng-core-1.4.0-cdh4.4.0.jar:na]
>
>        ... 3 common frames omitted
>
> Caused by: *org.apache.flume.EventDeliveryException*: NettyAvroRpcClient
> { host: oraclelinux6.localdomain, port: 44444 }: Interrupted in handshake
>
>        at org.apache.flume.api.NettyAvroRpcClient.appendBatch(
> *NettyAvroRpcClient.java:341*) ~[flume-ng-sdk-1.4.0-cdh4.4.0.jar:na]
>
>        at org.apache.flume.api.NettyAvroRpcClient.appendBatch(
> *NettyAvroRpcClient.java:282*) ~[flume-ng-sdk-1.4.0-cdh4.4.0.jar:na]
>
>        ... 4 common frames omitted
>
> Caused by: *java.lang.InterruptedException*: null
>
>        at java.util.concurrent.FutureTask.awaitDone(*FutureTask.java:400*)
> ~[na:1.7.0_45]
>
>        at java.util.concurrent.FutureTask.get(*FutureTask.java:199*)
> ~[na:1.7.0_45]
>
>        at org.apache.flume.api.NettyAvroRpcClient.appendBatch(
> *NettyAvroRpcClient.java:336*) ~[flume-ng-sdk-1.4.0-cdh4.4.0.jar:na]
>
>        ... 5 common frames omitted
>
>
>
> From the error message, “Interrupted in handshake”, I would suggest that
> whilst attempting to send the batch, the agent.stop() method interrupted it
> and caused the exception. When I put a Thread.sleep(10000) between putting
> a null event and the stop method, the problem seems to disappear and the
> batch is successfully flushed (from what I can see). I was wondering
> whether I have understood fully what is going on, and if so, whether there
> is a more elegant way of shutting down the embedded agent and flushing any
> events that are still stored locally?
>
>
>
> Any help would be greatly appreciated.
>
>
>
> Thanks and kind regards,
>
>
>
> Adam
>  The information included in this email and any files transmitted with it
> may contain information that is confidential and it must not be used by, or
> its contents or attachments copied or disclosed, to persons other than the
> intended addressee. If you have received this email in error, please notify
> BJSS. In the absence of written agreement to the contrary BJSS' relevant
> standard terms of contract for any work to be undertaken will apply. Please
> carry out virus or such other checks as you consider appropriate in respect
> of this email. BJSS do not accept responsibility for any adverse effect
> upon your system or data in relation to this email or any files transmitted
> with it. BJSS Limited, a company registered in England and Wales (Company
> Number 2777575), VAT Registration Number 613295452, Registered Office
> Address, First Floor, Coronet House, Queen Street, Leeds, LS1 2TW
>

Mime
View raw message