flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex Bohr <a...@gradientx.com>
Subject Flafka: KafkaSource EXCEPTION
Date Tue, 10 Mar 2015 04:19:15 GMT
Hi,
I've been testing out Flafka for a few days but I've been getting lots of
errors and message loss.

Would really appreciate some advice before I abandon it for alternate
options.

We are still getting Lots of these errors:
2015-03-10 04:04:19,099 (PollableSourceRunner-KafkaSource-kafka-source-1)
[ERROR -
org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:139)]
KafkaSource EXCEPTION, {}
java.lang.NullPointerException

Here's the version:
Flume 1.6.0-SNAPSHOT
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 3d03053615694ca638e5ddf314081826b8a5f1ac
Compiled by jenkins on Thu Feb 26 06:06:42 UTC 2015
>From source with checksum 3a3f330e678e5790c8c8e5b99063eea8

Our Kafka events are not stored with a key but from looking at the source I
this patch is included and should handle that:
https://issues.apache.org/jira/secure/attachment/12688560/FLUME-2578.0.patch

So I'm not sure what's still complaining about.

Also getting lots of errors on the sink, and lots of files are left with
".tmp" suffix.  I see that's a known issue but I'm not sure how to handle
it - how do I know when i can start ingesting a file if it still has the
".tmp" suffix - do I need to see if the file size changes or check the last
modified time?

We are trying to ingest about 100K QPS from a Kafka queue with 1024
partitions. I'm not spinning up that many Flume agents.  We have 4 machines
running 20 agents each.  The offsets are holding fairly steady so we're
keeping up over a few hours of test.
But the end result shows about 1% message loss.

Any advice on some configs I should tune to try and reduce the errors?
I've been playing with transaction capacity and batch sizes but no
improvement.

# Sources, channels, and sinks are defined per
# agent name, in this case flume1.
flume1.sources  = kafka-source-1
flume1.channels = hdfs-channel-1
flume1.sinks    = hdfs-sink-1

# For each source, channel, and sink, set
# standard properties.
flume1.sources.kafka-source-1.type =
org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-1.zookeeperConnect =
xxx.xx.xx.107:2181/kafkaCluster
flume1.sources.kafka-source-1.topic = Events3
flume1.sources.kafka-source-1.groupid = floomTest
flume1.sources.kafka-source-1.batchSize = 10000
flume1.sources.kafka-source-1.channels = hdfs-channel-1

flume1.channels.hdfs-channel-1.type   = memory
flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1
flume1.sinks.hdfs-sink-1.type = hdfs
flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Writable
flume1.sinks.hdfs-sink-1.hdfs.fileType = SequenceFile
flume1.sinks.hdfs-sink-1.hdfs.filePrefix =
%Y-%m-%d-%H-%M-%{host}-1-sequence-events
flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-1.hdfs.path =
hdfs://xxx.xx.xxx.41:8020/user/gxetl/test_flume/%{topic}
flume1.sinks.hdfs-sink-1.hdfs.rollCount=0
flume1.sinks.hdfs-sink-1.hdfs.rollSize=0
flume1.sinks.hdfs-sink-1.hdfs.rollInterval=120

# Other properties are specific to each type of
# source, channel, or sink. In this case, we
# specify the capacity of the memory channel.
flume1.channels.hdfs-channel-1.capacity = 500000
flume1.channels.hdfs-channel-1.transactionCapacity = 100000

Any advice much appreciated!
Thanks!

Mime
View raw message