flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Roshan Naik <ros...@hortonworks.com>
Subject Re: Flume Ng replaying events when the source is idle
Date Thu, 28 Feb 2013 22:43:37 GMT
would you be able to you verify if the same problem can be reproduced by
using the memory channel instead in a test setup ?


On Wed, Feb 27, 2013 at 11:37 AM, Sagar Mehta <sagarmehta@gmail.com> wrote:

> Hi Guys,
>
> I'm using Flume-Ng and it is working pretty well except for a weird
> situation which I observed lately. In essence I'm using an exec source for
> doing  tail -F on a logfile and using two HDFS sinks with a File channel.
>
> However I have observed that when the source [ logfile of a jetty based
> collector] is idle - that is no new events are pushed to the logFile,
> FlumeNg seems to replay the same set of events.
>
> For example collector110 received no events for 2 subsequent hours and
> below are the corresponding Flume written files at the HDFS sink
>
> hadoop@jobtracker301:/home/hadoop/sagar$ hls
> /ngpipes-raw-logs/2013-02-27/1400/collector110*
> -rw-r--r--   3 hadoop supergroup        441 2013-02-27 14:20
> /ngpipes-raw-logs/2013-02-27/1400/collector110.ngpipes.sac.ngmoco.com.1361974853210.gz
> -rw-r--r--   3 hadoop supergroup        441 2013-02-27 14:50
> /ngpipes-raw-logs/2013-02-27/1400/collector110.ngpipes.sac.ngmoco.com.1361976653432.gz
>
> hadoop@jobtracker301:/home/hadoop/sagar$ hls
> /ngpipes-raw-logs/2013-02-27/1500/collector110*
> -rw-r--r--   3 hadoop supergroup        441 2013-02-27 15:20
> /ngpipes-raw-logs/2013-02-27/1500/collector110.ngpipes.sac.ngmoco.com.1361978454123.gz
> -rw-r--r--   3 hadoop supergroup        441 2013-02-27 15:50
> /ngpipes-raw-logs/2013-02-27/1500/collector110.ngpipes.sac.ngmoco.com.1361980254338.gz
>
> hadoop@jobtracker301:/home/hadoop/sagar$ md5sum *
> c7360ef5c8deaee3ce9f4c92e9d9be63
>  collector110.ngpipes.sac.ngmoco.com.1361974853210.gz
> c7360ef5c8deaee3ce9f4c92e9d9be63
>  collector110.ngpipes.sac.ngmoco.com.1361976653432.gz
> c7360ef5c8deaee3ce9f4c92e9d9be63
>  collector110.ngpipes.sac.ngmoco.com.1361978454123.gz
> c7360ef5c8deaee3ce9f4c92e9d9be63
>  collector110.ngpipes.sac.ngmoco.com.1361980254338.gz
>
> As you can see above the md5sums match.
>
> I'm using a File channel which has checkpoints, so I'm not sure what is
> going on. Btw looks like the difference in timestamps of the two replays is
> exactly 30 mins.
>
> *Is this a known bug or am I missing something?*
> *
> *
> *Below is my Flume config file*
>
> smehta@collector110:/opt/flume/conf$ cat hdfs.conf
> # An hdfs sink to write events to the hdfs on the test cluster
> # A memory based channel to connect the above source and sink
>
> # Name the components on this agent
> collector110.sources = source1
> collector110.sinks = sink1 sink2
> collector110.channels = channel1 channel2
>
> # Configure the source
> collector110.sources.source1.type = exec
> collector110.sources.source1.command = tail -F /opt/jetty/logFile.log
>
> # Configure the interceptors
> collector110.sources.source1.interceptors = TimestampInterceptor
> HostInterceptor
>
> # We use the Timestamp interceptor to get timestamps of when flume
> receives events
> # This is used for figuring out the bucket to which an event goes
> collector110.sources.source1.interceptors.TimestampInterceptor.type =
> timestamp
>
> # We use the Host interceptor to populate the host header with the fully
> qualified domain name of the collector.
> # That way we know which file in the sink respresents which collector.
> collector110.sources.source1.interceptors.HostInterceptor.type =
> org.apache.flume.interceptor.HostInterceptor$Builder
> collector110.sources.source1.interceptors.HostInterceptor.preserveExisting
> = false
> collector110.sources.source1.interceptors.HostInterceptor.useIP = false
> collector110.sources.source1.interceptors.HostInterceptor.hostHeader = host
>
>
> # Configure the sink
>
> collector110.sinks.sink1.type = hdfs
>
> # Configure the bucketing
> collector110.sinks.sink1.hdfs.path=hdfs://
> namenode3001.ngpipes.milp.ngmoco.com:9000/ngpipes-raw-logs/%Y-%m-%d/%H00
>
> # Prefix the file with the source so that we know where the events in the
> file came from
> collector110.sinks.sink1.hdfs.filePrefix = %{host}
>
> # We roll the flume output file based on time interval - currently every 5
> minutes
> collector110.sinks.sink1.hdfs.rollSize = 0
> collector110.sinks.sink1.hdfs.rollCount = 0
> collector110.sinks.sink1.hdfs.rollInterval = 300
>
> #gzip compression related settings
> collector110.sinks.sink1.hdfs.codeC = gzip
> collector110.sinks.sink1.hdfs.fileType = CompressedStream
> collector110.sinks.sink1.hdfs.fileSuffix = .gz
>
> # Configure the sink
>
> collector110.sinks.sink2.type = hdfs
>
> # Configure the bucketing
> collector110.sinks.sink2.hdfs.path=hdfs://
> namenode5001.ngpipes.sac.ngmoco.com:9000/ngpipes-raw-logs/%Y-%m-%d/%H00
>
> # Prefix the file with the source so that we know where the events in the
> file came from
> collector110.sinks.sink2.hdfs.filePrefix = %{host}
>
> # We roll the flume output file based on time interval - currently every 5
> minutes
> collector110.sinks.sink2.hdfs.rollSize = 0
> collector110.sinks.sink2.hdfs.rollCount = 0
> collector110.sinks.sink2.hdfs.rollInterval = 300
>
> #gzip compression related settings
> collector110.sinks.sink2.hdfs.codeC = gzip
> collector110.sinks.sink2.hdfs.fileType = CompressedStream
> collector110.sinks.sink2.hdfs.fileSuffix = .gz
>
> # Configure the channel that connects the source to the sink
>
> # Use a channel which buffers events in filesystem
> collector110.channels.channel1.type = file
> collector110.channels.channel1.checkpointDir =
> /data/flume_data/channel1/checkpoint
> collector110.channels.channel1.dataDirs = /data/flume_data/channel1/data
>
> # Use a channel which buffers events in filesystem
> collector110.channels.channel2.type = file
> collector110.channels.channel2.checkpointDir =
> /data/flume_data/channel2/checkpoint
> collector110.channels.channel2.dataDirs = /data/flume_data/channel2/data
>
> # Bind the source and sink to the channel configured above
> collector110.sources.source1.channels = channel1 channel2
> collector110.sinks.sink1.channel = channel1
> collector110.sinks.sink2.channel = channel2
>
> Sagar
>
>
>

Mime
View raw message