flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun MA <mj.saber1...@gmail.com>
Subject Re: Flafka: Kafka channel unable to deliver event
Date Tue, 07 Jul 2015 02:26:33 GMT
Hi Gwen, 

Thank you so much for the comments. I followed your suggestion and get pretty much what I
want except one situation which is killing agent unexpected.

My test case is: 1 Kafka channel and 1 HDFS sink, 1 topic with 2 partitions, two agents read
from same topic with same group id (default “flume”). I produced 200,000 messages evenly
distributed to 2 partitions and brought up two agents to consume messages. Once the agent2
creating hfs files (with .tmp postfix), I kill it 2 seconds later (rollInterval set to 20).
Then the agent1 will consume everything left but not the messages agent2 consumed but not
committed the offset (at least I think the agent2 didn’t commit because the transaction
didn’t finish). 
At the HDFS side, the .tmp file is not renamed (as expected) but the size of it is not correctly
showed in HDFS. It only shows like several kb but it actually have way more than that. The
messages agent1 consumed is not all messages from the topic, but add to the .tmp file that
agent2 consumed, it is everything (plus some duplicate). 

My question is that is this the right behavior? How can I rename .tmp file even if agent is
killed unexpected? Or can I make active agent consume everything after the other agent down?
(Duplicate is fine in my case, the only requirement is not to lose data).

Thanks for help!

My configuration is below.

a1.sinks = sink1
a1.channels = channel1

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.brokerList = localhost:9093,localhost:9094
a1.channels.channel1.topic = test2
a1.channels.channel1.zookeeperConnect = localhost:2181
a1.channels.channel1.parseAsFlumeEvent = false
a1.channels.channel1.timeout = 1000

a1.sinks.sink1.type = hdfs
a1.sinks.sink1.hdfs.path = hdfs://corehadoop/tmp/incoming-logs
a1.sinks.sink1.hdfs.filePrefix = agent1
a1.sinks.sink1.hdfs.rollInterval = 20
a1.sinks.sink1.hdfs.rollSize = 0
a1.sinks.sink1.hdfs.rollCount = 0
a1.sinks.sink1.hdfs.fileType = DataStream
a1.sinks.sink1.channel = channel1

a2.sinks = sink1
a2.channels = channel1

a2.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a2.channels.channel1.brokerList = localhost:9093,localhost:9094
a2.channels.channel1.topic = test2
a2.channels.channel1.zookeeperConnect = localhost:2181
a2.channels.channel1.parseAsFlumeEvent = false
a2.channels.channel1.timeout = 1000

a2.sinks.sink1.type = hdfs
a2.sinks.sink1.hdfs.path = hdfs://corehadoop/tmp/incoming-logs
a2.sinks.sink1.hdfs.filePrefix = agent2
a2.sinks.sink1.hdfs.rollInterval = 20
a2.sinks.sink1.hdfs.rollSize = 0
a2.sinks.sink1.hdfs.rollCount = 0
a2.sinks.sink1.hdfs.fileType = DataStream
a2.sinks.sink1.channel = channel1
> On Jul 5, 2015, at 10:14 PM, Gwen Shapira <gshapira@cloudera.com> wrote:
> 
> See inline :)
> 
> On Sun, Jul 5, 2015 at 5:37 PM, Jun MA <mj.saber1990@gmail.com> wrote:
>> Hi Rufus,
>> 
>> Thank you so much for your help, I do bypass the issue.
>> 
>> Another question I have is that can I have two Flafka agents consume from
>> one topic (agents run on same machine or different machine)?
> 
> Yes.
> 
>> Will the two
>> agents have exact same two copies or each agent will consume one part of the
>> topic?
> 
> It depends on how you configure them.
> If you configure two different consumer group IDs, they will each get a copy.
> If they use the same consumer group ID, each agent will consume from
> different subset of partitions. They will automatically load balance
> partitions in case more agents join or if agents are stopped.
> 
>> The purpose I want to run two agents on same topic is that I want to
>> have a high availability. What I want is that two agents will consume
>> different portion of the topic but if one agent down, the other one will
>> consume everything from the topic.
>> From what I tested, I bring up two agents on same machine with different
>> agent name (everything else are the same), and only the first one can
>> consume messages from topic. I’m wondering if it is the right behavior. If
>> so, is there anyway I can solve this single point of failure?
> 
> If you configure both Flafkas with same consumer group ID, you should
> get the behavior you want.
> Just make sure you have at least two partitions - the load balancing
> behavior happens at the partition level, so you can only have as many
> concurrent agents as you have partitions.
> 
> Gwen
> 
>> 
>> Bests,
>> Jun
>> 
>> On Jul 4, 2015, at 10:22 AM, Johny Rufus <jrufus@cloudera.com> wrote:
>> 
>> Been looking into this for some time and found a couple of issues. Raised
>> Jiras for both, there are workarounds to get past these errors.
>> 
>> https://issues.apache.org/jira/browse/FLUME-2734
>> https://issues.apache.org/jira/browse/FLUME-2735
>> 
>> For timeout to take effect, you need to specify as -
>> agent1.channels.channel1.timeout = 1000000 (This is a temporary work around,
>> kafka.consumer.timeout.ms should work as per the guide when FLUME-2734 is
>> resolved)
>> 
>> To get past the IllegalStateException, I had to download the zookeeper jar
>> from http://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper/3.3.6
>> and put it in the lib directory. This is also a workaround to get past the
>> issue, until we figure out the root cause.
>> 
>> 
>> Thanks,
>> Rufus
>> 
>> On Fri, Jul 3, 2015 at 10:09 PM, Jun Ma <mj.saber1990@gmail.com> wrote:
>>> 
>>> Oops, thanks for figuring that out.
>>> Any idea why unable to deliver event?
>>> 
>>> On Fri, Jul 3, 2015 at 8:00 PM, Johny Rufus <jrufus@cloudera.com> wrote:
>>>> 
>>>> There is a typo in your property "cnannel1", hence the property is not
>>>> set
>>>> 
>>>> a1.channels.cnannel1.kafka.consumer.timeout.ms = 1000000
>>>> 
>>>> Thanks,
>>>> Rufus
>>>> 
>>>> On Fri, Jul 3, 2015 at 4:49 PM, Jun Ma <mj.saber1990@gmail.com> wrote:
>>>>> 
>>>>> Thanks for your reply. But from what I read, the magic things Flafka
>>>>> does is that you don't need to have a source, you can directly move things
>>>>> from channel to sink.
>>>>> 
>>>>> http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/
>>>>> (see Flume's kafka channel)
>>>>> 
>>>>> 
>>>>> On Fri, Jul 3, 2015 at 1:26 PM, Foo Lim <foo.lim@vungle.com> wrote:
>>>>>> 
>>>>>> There's an error that you didn't specify a source:
>>>>>> 
>>>>>> Agent configuration for 'a1' has no sources.
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>> 


Mime
View raw message