flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hari Shreedharan <hshreedha...@cloudera.com>
Subject Re: Flafka: KafkaSource EXCEPTION
Date Tue, 10 Mar 2015 04:24:11 GMT
You are probably hitting https://issues.apache.org/jira/browse/FLUME-2578

Add a key to the message as a workaround or build Flume with that patch.

Alex Bohr wrote:
>
> Hi,
> I've been testing out Flafka for a few days but I've been getting lots
> of errors and message loss.
>
> Would really appreciate some advice before I abandon it for alternate
> options.
>
> We are still getting Lots of these errors:
> 2015-03-10 04:04:19,099
> (PollableSourceRunner-KafkaSource-kafka-source-1) [ERROR -
> org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:139)]
> KafkaSource EXCEPTION, {}
> java.lang.NullPointerException
>
> Here's the version:
> Flume 1.6.0-SNAPSHOT
> Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
> Revision: 3d03053615694ca638e5ddf314081826b8a5f1ac
> Compiled by jenkins on Thu Feb 26 06:06:42 UTC 2015
> From source with checksum 3a3f330e678e5790c8c8e5b99063eea8
>
> Our Kafka events are not stored with a key but from looking at the
> source I this patch is included and should handle that:
> https://issues.apache.org/jira/secure/attachment/12688560/FLUME-2578.0.patch
>
> So I'm not sure what's still complaining about.
>
> Also getting lots of errors on the sink, and lots of files are left
> with ".tmp" suffix. I see that's a known issue but I'm not sure how
> to handle it - how do I know when i can start ingesting a file if it
> still has the ".tmp" suffix - do I need to see if the file size
> changes or check the last modified time?
>
> We are trying to ingest about 100K QPS from a Kafka queue with 1024
> partitions. I'm not spinning up that many Flume agents. We have 4
> machines running 20 agents each. The offsets are holding fairly
> steady so we're keeping up over a few hours of test.
> But the end result shows about 1% message loss.
>
> Any advice on some configs I should tune to try and reduce the
> errors? I've been playing with transaction capacity and batch sizes
> but no improvement.
>
> # Sources, channels, and sinks are defined per
> # agent name, in this case flume1.
> flume1.sources = kafka-source-1
> flume1.channels = hdfs-channel-1
> flume1.sinks = hdfs-sink-1
>
> # For each source, channel, and sink, set
> # standard properties.
> flume1.sources.kafka-source-1.type =
> org.apache.flume.source.kafka.KafkaSource
> flume1.sources.kafka-source-1.zookeeperConnect =
> xxx.xx.xx.107:2181/kafkaCluster
> flume1.sources.kafka-source-1.topic = Events3
> flume1.sources.kafka-source-1.groupid = floomTest
> flume1.sources.kafka-source-1.batchSize = 10000
> flume1.sources.kafka-source-1.channels = hdfs-channel-1
>
> flume1.channels.hdfs-channel-1.type = memory
> flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1
> flume1.sinks.hdfs-sink-1.type = hdfs
> flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Writable
> flume1.sinks.hdfs-sink-1.hdfs.fileType = SequenceFile
> flume1.sinks.hdfs-sink-1.hdfs.filePrefix =
> %Y-%m-%d-%H-%M-%{host}-1-sequence-events
> flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
> flume1.sinks.hdfs-sink-1.hdfs.path =
> hdfs://xxx.xx.xxx.41:8020/user/gxetl/test_flume/%{topic}
> flume1.sinks.hdfs-sink-1.hdfs.rollCount=0
> flume1.sinks.hdfs-sink-1.hdfs.rollSize=0
> flume1.sinks.hdfs-sink-1.hdfs.rollInterval=120
>
> # Other properties are specific to each type of
> # source, channel, or sink. In this case, we
> # specify the capacity of the memory channel.
> flume1.channels.hdfs-channel-1.capacity = 500000
> flume1.channels.hdfs-channel-1.transactionCapacity = 100000
>
> Any advice much appreciated!
> Thanks!
>
>

Mime
View raw message