flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sagar Mehta <sagarme...@gmail.com>
Subject Flume Ng replaying events when the source is idle
Date Wed, 27 Feb 2013 19:37:40 GMT
Hi Guys,

I'm using Flume-Ng and it is working pretty well except for a weird
situation which I observed lately. In essence I'm using an exec source for
doing  tail -F on a logfile and using two HDFS sinks with a File channel.

However I have observed that when the source [ logfile of a jetty based
collector] is idle - that is no new events are pushed to the logFile,
FlumeNg seems to replay the same set of events.

For example collector110 received no events for 2 subsequent hours and
below are the corresponding Flume written files at the HDFS sink

hadoop@jobtracker301:/home/hadoop/sagar$ hls
-rw-r--r--   3 hadoop supergroup        441 2013-02-27 14:20
-rw-r--r--   3 hadoop supergroup        441 2013-02-27 14:50

hadoop@jobtracker301:/home/hadoop/sagar$ hls
-rw-r--r--   3 hadoop supergroup        441 2013-02-27 15:20
-rw-r--r--   3 hadoop supergroup        441 2013-02-27 15:50

hadoop@jobtracker301:/home/hadoop/sagar$ md5sum *

As you can see above the md5sums match.

I'm using a File channel which has checkpoints, so I'm not sure what is
going on. Btw looks like the difference in timestamps of the two replays is
exactly 30 mins.

*Is this a known bug or am I missing something?*
*Below is my Flume config file*

smehta@collector110:/opt/flume/conf$ cat hdfs.conf
# An hdfs sink to write events to the hdfs on the test cluster
# A memory based channel to connect the above source and sink

# Name the components on this agent
collector110.sources = source1
collector110.sinks = sink1 sink2
collector110.channels = channel1 channel2

# Configure the source
collector110.sources.source1.type = exec
collector110.sources.source1.command = tail -F /opt/jetty/logFile.log

# Configure the interceptors
collector110.sources.source1.interceptors = TimestampInterceptor

# We use the Timestamp interceptor to get timestamps of when flume receives
# This is used for figuring out the bucket to which an event goes
collector110.sources.source1.interceptors.TimestampInterceptor.type =

# We use the Host interceptor to populate the host header with the fully
qualified domain name of the collector.
# That way we know which file in the sink respresents which collector.
collector110.sources.source1.interceptors.HostInterceptor.type =
= false
collector110.sources.source1.interceptors.HostInterceptor.useIP = false
collector110.sources.source1.interceptors.HostInterceptor.hostHeader = host

# Configure the sink

collector110.sinks.sink1.type = hdfs

# Configure the bucketing

# Prefix the file with the source so that we know where the events in the
file came from
collector110.sinks.sink1.hdfs.filePrefix = %{host}

# We roll the flume output file based on time interval - currently every 5
collector110.sinks.sink1.hdfs.rollSize = 0
collector110.sinks.sink1.hdfs.rollCount = 0
collector110.sinks.sink1.hdfs.rollInterval = 300

#gzip compression related settings
collector110.sinks.sink1.hdfs.codeC = gzip
collector110.sinks.sink1.hdfs.fileType = CompressedStream
collector110.sinks.sink1.hdfs.fileSuffix = .gz

# Configure the sink

collector110.sinks.sink2.type = hdfs

# Configure the bucketing

# Prefix the file with the source so that we know where the events in the
file came from
collector110.sinks.sink2.hdfs.filePrefix = %{host}

# We roll the flume output file based on time interval - currently every 5
collector110.sinks.sink2.hdfs.rollSize = 0
collector110.sinks.sink2.hdfs.rollCount = 0
collector110.sinks.sink2.hdfs.rollInterval = 300

#gzip compression related settings
collector110.sinks.sink2.hdfs.codeC = gzip
collector110.sinks.sink2.hdfs.fileType = CompressedStream
collector110.sinks.sink2.hdfs.fileSuffix = .gz

# Configure the channel that connects the source to the sink

# Use a channel which buffers events in filesystem
collector110.channels.channel1.type = file
collector110.channels.channel1.checkpointDir =
collector110.channels.channel1.dataDirs = /data/flume_data/channel1/data

# Use a channel which buffers events in filesystem
collector110.channels.channel2.type = file
collector110.channels.channel2.checkpointDir =
collector110.channels.channel2.dataDirs = /data/flume_data/channel2/data

# Bind the source and sink to the channel configured above
collector110.sources.source1.channels = channel1 channel2
collector110.sinks.sink1.channel = channel1
collector110.sinks.sink2.channel = channel2


View raw message