flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jagadish Bihani <jagadish.bih...@pubmatic.com>
Subject Re: HDFS file rolling behaviour
Date Thu, 20 Sep 2012 06:45:08 GMT
Hi

This is my *destination agent* config:*(Which has a HDFS sink)*
-------------------------------------------------
=============================================
agent.sources = avro-collection-source
agent.channels = fileChannel
agent.sinks = hdfsSink fileSink

# For each one of the sources, the type is defined
agent.sources.avro-collection-source.type=avro
agent.sources.avro-collection-source.bind=10.0.17.3
agent.sources.avro-collection-source.port=10011
agent.sources.avro-collection-source.interceptors = ts
agent.sources.avro-collection-source.interceptors.ts.type = timestamp
# The channel can be defined as follows.
agent.sources.avro-collection-source.channels = fileChannel

# Each sink's type must be defined
agent.sinks.hdfsSink.type = hdfs
#agent.sinks.hdfsSink.hdfs.path= 
hdfs://mltest2001.pubmatic.com/flume/experiments
agent.sinks.hdfsSink.hdfs.path=hdfs://mltest2001.pubmatic.com/flume/tracker3

#agent.sinks.hdfsSink.hdfs.fileType =DataStream
agent.sinks.hdfsSink.hdfs.fileType =CompressedStream
agent.sinks.hdfsSink.hdfs.filePrefix=adtrack_backup_sizeroll_%Y%m%d_%H%M%S_
#agent.sinks.hdfsSink.hdfs.filePrefix=adtrack_backup_
agent.sinks.hdfsSink.hdfs.rollSize=0
agent.sinks.hdfsSink.hdfs.codeC=bzip2
agent.sinks.hdfsSink.hdfs.rollCount=0
agent.sinks.hdfsSink.hdfs.batchSize=1000
#agent.sinks.hdfsSink.hdfs.writeFormat=Text
agent.sinks.hdfsSink.hdfs.rollInterval=180


#Define file sink
agent.sinks.fileSink.type = file_roll
agent.sinks.fileSink.sink.directory = /root/flume_sink

#Specify the channel the sink should use
agent.sinks.hdfsSink.channel= fileChannel
agent.channels.fileChannel.type=file
agent.channels.fileChannel.dataDirs=/root/flume_channel/dataDir13
agent.channels.fileChannel.checkpointDir=/root/flume_channel/checkpointDir13
agent.channels.fileChannel.write-timeout=30
================================================================

This is my*source agent config:*(*Which has an avro sink)*
=============================================
adServerAgent.sources = execSource
adServerAgent.channels = fileChannel
adServerAgent.sinks = avro-forward-sink1
#adServerAgent.sinkgroups = failover_group

# For each one of the sources, the type is defined
adServerAgent.sources.execSource.type = exec
adServerAgent.sources.execSource.command = /usr/bin/perl 
/root/flume/scripts/logtailDir.pl
adServerAgent.sources.execSource.restart=false
adServerAgent.sources.execSource.batchSize = 1000

# The channel can be defined as follows.
adServerAgent.sources.execSource.channels = fileChannel

# Each sink's type must be defined
adServerAgent.sinks.avro-forward-sink1.type = avro
adServerAgent.sinks.avro-forward-sink1.hostname=10.0.17.3
adServerAgent.sinks.avro-forward-sink1.port=10011
adServerAgent.sinks.avro-forward-sink1.connect-timeout = 300000
adServerAgent.sinks.avro-forward-sink1.channel = fileChannel

adServerAgent.channels.fileChannel.type=file
adServerAgent.channels.fileChannel.dataDirs=/root/flume/channel/dataDir1
adServerAgent.channels.fileChannel.checkpointDir=/root/flume/channel/checkpointDir1
adServerAgent.channels.fileChannel.write-timeout=30
===============================================================================

*OS Open file limit for the user:*
Soft limit :75000
Hard limit: 150000

===============================================================================
*Problem*:
1. When I set rolling interval say x seconds (In my case 600 sec) at 
destination sink. When agent receives
data it creates many .tmp file at the same time and after rolling 
interval is elpased it SYNCs them to HDFS.
Thus in 600 seconds I get many small files instead of 1 file which I 
expected. Why do multiple connections are made
from avro sink to destination agent? Or is it related to HDFS sink batch 
size. (My batch size is 1000.)

2. I have seen upto some rolling interval (in my case I have seen 180 
sec) flume works fine i.e. creates file in HDFS (though multiple
files in the same rolling interval.) But after increasing the rolling 
interval by few seconds. It gives exceptions like:

WARN hdfs.DFSClient: Unable to persist blocks in hflush for 
/flume/tracker3/adtrack_backup_sizeroll_20120919_230206_.1348121067092.bz2.tmp
java.io.IOException: Call to mltest2001.pubmatic.com/10.0.17.3:8020 
failed on local exception: java.io.IOException: Too many open files
         at org.apache.hadoop.ipc.Client.wrapException(Client.java:1129)
         at org.apache.hadoop.ipc.Client.call(Client.java:1097)
         at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:229)
         at $Proxy6.fsync(Unknown Source)
         at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
         at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
         at java.lang.reflect.Method.invoke(Method.java:597)
         at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:85)
         at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:62)
         at $Proxy6.fsync(Unknown Source)
         at 
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3805)
         at 
org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
         at 
org.apache.flume.sink.hdfs.HDFSCompressedDataStream.sync(HDFSCompressedDataStream.java:96)
         at 
org.apache.flume.sink.hdfs.BucketWriter.doFlush(BucketWriter.java:292)
         at 
org.apache.flume.sink.hdfs.BucketWriter.access$500(BucketWriter.java:48)
         at 
org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:281)
         at 
org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:278)
         at 
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
         at 
org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:278)
         at 
org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:732)
         at 
org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
         at 
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
         at java.util.concurrent.FutureTask.run(FutureTask.java:138)
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
         at java.lang.Thread.run(Thread.java:662)
Caused by: java.io.IOException: Too many open files
         at sun.nio.ch.IOUtil.initPipe(Native Method)
         at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:49)
         at 
sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:18)
         at 
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.get(SocketIOWithTimeout.java:407)
         at 
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:322)
         at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
         at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:155)
         at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:128)
         at java.io.FilterInputStream.read(FilterInputStream.java:116)
         at 
org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:346)
         at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
         at java.io.BufferedInputStream.read(BufferedInputStream.java:237)
         at java.io.DataInputStream.readInt(DataInputStream.java:370)
         at 
org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:826)
         at org.apache.hadoop.ipc.Client$Connection.run(Client.java:771)


Regards,
Jagadish




On 09/18/2012 08:07 PM, Brock Noland wrote:
> If you have not increased the OS number of open files limit, you 
> should. The default limit of 1024 is too low for nearly every modern 
> application.
>
> In regards to the rolling, can you paste you config and describe in 
> more detail the unexpected behavior you are seeing?
>
> Brock
>
> On Tue, Sep 18, 2012 at 7:08 AM, Jagadish Bihani 
> <jagadish.bihani@pubmatic.com <mailto:jagadish.bihani@pubmatic.com>> 
> wrote:
>
>     Hi
>
>     Does anybody know about  the issue mentioned in the following mail?
>
>
>     Update: I have seen following behaviour now even for time based
>     rolling.
>     By time based rolling I would expect: That single file should be
>     created after x seconds.
>     But in my case some n files are created after every x seconds.
>     Is it something to do with HDFS batch size?
>
>     Regards,
>     Jagadish
>
>
>
>     -------- Original Message --------
>     Subject: 	HDFS file rolling behaviour
>     Date: 	Thu, 13 Sep 2012 14:26:56 +0530
>     From: 	Jagadish Bihani <jagadish.bihani@pubmatic.com>
>     <mailto:jagadish.bihani@pubmatic.com>
>     To: 	user@flume.apache.org <mailto:user@flume.apache.org>
>
>
>
>     Hi
>
>     I use two flume agents:
>     1. flume_agent 1 which is a source with (exec source -file channel
>     -avro sink)
>     2. flume_agent 2 which is a dest with (avro source -file channel -
>     HDFS sink)
>
>     I have observed that for HDFS sink with rolling by *file
>     size/number of events* it
>     creates a lot of simultaneous connections to source's avro sink. But
>     while rolling by *time interval* it does it *one by one* i.e.
>     opens 1 HDFS file write to
>     it and then close it.  I expect for other rolling intervals too
>     same thing should happen
>     i.e.  first open file and if x number of events are written to it
>     then roll it and open another
>     and so on.
>
>     In my case my data ingestion works fine with "time" based rolling
>     but in other
>     cases due to the above behaviour I get exceptions like:
>     -- too many open files
>     -- timeout related exceptions for file channel and few more
>     exceptions.
>
>     I can increase the values of the parameters giving exceptions but
>     I dont know what
>     adverse effects it may have.
>
>     Can somebody throw some light on the rolling based on file
>     size/number of events ?
>
>     Regards,
>     Jagadish
>
>
>
>
>
>
> -- 
> Apache MRUnit - Unit testing MapReduce - 
> http://incubator.apache.org/mrunit/


Mime
View raw message