flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kathleen Ting <kathl...@apache.org>
Subject Re: HDFS sink - Property: hdfs.callTimeout
Date Thu, 04 Oct 2012 07:07:37 GMT
Hi Jagadish,

Increasing the hdfs.callTimeout parameter (default of 20 seconds) to
25 seconds can help with timeout exceptions.

If, after increasing hdfs.callTimeout to 25 seconds, it's still timing
out, you'll want to investigate what's causing latency issues in your
HDFS cluster.

Regards, Kathleen

On Wed, Oct 3, 2012 at 11:39 PM, Jagadish Bihani
<jagadish.bihani@pubmatic.com> wrote:
> Hi
>
> What is the implication of this property "hdfs.callTimeout". What adverse
> effect it may have if I change it ?
>
> I am getting timeout exception as:
> Noted checkpoint for file: /home/hadoop/flume_channel/dataDir15/log-21, id:
> 21, checkpoint position: 1576210481
> 12/10/03 23:19:45 INFO file.LogFile: Closing
> /home/hadoop/flume_channel/dataDir15/log-21
> 12/10/03 23:19:55 WARN hdfs.HDFSEventSink: HDFS IO error
> java.io.IOException: Callable timed out
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:343)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.append(HDFSEventSink.java:714)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:412)
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:736)
> Caused by: java.util.concurrent.TimeoutException
>         at
> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
>         at java.util.concurrent.FutureTask.get(FutureTask.java:91)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:336)
>         ... 5 more
>
>
> My configuration is:
>
> Agent A: Source
> ==========
>
> adServerAgent.sources = execSource
> adServerAgent.channels = fileChannel
> adServerAgent.sinks = avro-forward-sink1
> #adServerAgent.sinkgroups = failover_group
>
> # For each one of the sources, the type is defined
> adServerAgent.sources.execSource.type = exec
> adServerAgent.sources.execSource.command = /usr/bin/perl
> /home/http/flume/scripts/logtailDir_trial.pl 2>/tmp/logtail_failure.log
> adServerAgent.sources.execSource.restart=false
> adServerAgent.sources.execSource.batchSize = 1000
>
> # The channel can be defined as follows.
> adServerAgent.sources.execSource.channels = fileChannel
>
> # Each sink's type must be defined
> adServerAgent.sinks.avro-forward-sink1.type = avro
> adServerAgent.sinks.avro-forward-sink1.hostname=10.0.17.3
> adServerAgent.sinks.avro-forward-sink1.port=10012
> adServerAgent.sinks.avro-forward-sink1.connect-timeout = 300000
>
> #Specify the channel the sink should use
> adServerAgent.sinks.avro-forward-sink1.channel = fileChannel
> adServerAgent.channels.fileChannel.type=file
> adServerAgent.channels.fileChannel.dataDirs=/home/http/flume/channel/dataDir_trial
> adServerAgent.channels.fileChannel.checkpointDir=/home/http/flume/channel/checkpointDir_trial
> adServerAgent.channels.fileChannel.write-timeout=30
>
> where the script in the exec source just cats the files in the given
> directory:
> It is :
> Exec script is
> ========
> my $DIR = "/TRACKING_FILES/backuped";
> #my $MOVED_DIR = "";
> my $OFFSET_DIR = "$ENV{'HOME'}/flume/offset_dir_trial";
> my $SLEEP_TIME = 145;
> my $LOGATAIL_CMD = "$ENV{'HOME'}/flume/logtail_install/usr/sbin/logtail2";
> ################
>
> while(1)
> {
>         opendir(DIR,$DIR) or die "Couldn't open dir $DIR. $!";
> #       chomp(my @files = `ls $DIR`);
> #       foreach $file (@files)
>         while(my $file = readdir(DIR))
>         {
>                 #print $file."\n";
>                 #if($file =~ m/\d+impressionthread\d+\.tsv/)
>                 #{
>                         if(-f "$DIR/$file")
>                         {
>                         #       print "logtail2 -f $DIR/$file -o
> $OFFSET_DIR/$file.offset";
>                                 print `$LOGATAIL_CMD -f $DIR/$file -o
> $OFFSET_DIR/$file.offset`;
>                         }
>                 #}
>         }
>         closedir(DIR);
>         sleep($SLEEP_TIME);
> #       print "\n @files :".@files;
> }
>
>
> Agent B is: (Destination)
> ==============
>
> adServerAgent.sources = avro-collection-source
> adServerAgent.channels = fileChannel
> adServerAgent.sinks = hdfsSink fileSink
>
> # For each one of the sources, the type is defined
> adServerAgent.sources.avro-collection-source.type=avro
> adServerAgent.sources.avro-collection-source.bind=10.0.17.3
> adServerAgent.sources.avro-collection-source.port=10012
> adServerAgent.sources.avro-collection-source.interceptors = ts
> adServerAgent.sources.avro-collection-source.interceptors.ts.type =
> timestamp
> # The channel can be defined as follows.
> adServerAgent.sources.avro-collection-source.channels = fileChannel
>
> # Each sink's type must be defined
> adServerAgent.sinks.hdfsSink.type = hdfs
> adServerAgent.sinks.hdfsSink.hdfs.path=
> hdfs://mltest2001.pubmatic.com/flume/experiments_1_machine
> #adServerAgent.sinks.hdfsSink.hdfs.path=hdfs://mltest2001.pubmatic.com/flume/trackers
>
> #adServerAgent.sinks.hdfsSink.hdfs.fileType =DataStream
> adServerAgent.sinks.hdfsSink.hdfs.fileType =CompressedStream
> adServerAgent.sinks.hdfsSink.hdfs.filePrefix=adtrack_backup_%Y%m%d_%H%M%S_
> #adServerAgent.sinks.hdfsSink.hdfs.filePrefix=adtrack_backup_
> adServerAgent.sinks.hdfsSink.hdfs.rollSize=100000000
> adServerAgent.sinks.hdfsSink.hdfs.codeC=bzip2
> adServerAgent.sinks.hdfsSink.hdfs.rollCount=20000
> adServerAgent.sinks.hdfsSink.hdfs.batchSize=1
> #adServerAgent.sinks.hdfsSink.hdfs.writeFormat=Text
> adServerAgent.sinks.hdfsSink.hdfs.rollInterval=600
> adServerAgent.sinks.hdfsSink.hdfs.txnEventMax=1
> #adServerAgent.sinks.hdfsSink.hdfs.maxOpenFiles=20000
>
>
> #Define file sink
> adServerAgent.sinks.fileSink.type = file_roll
> adServerAgent.sinks.fileSink.sink.directory = /home/hadoop/flume_sink
> adServerAgent.sinks.hdfsSink.channel= fileChannel
> #adServerAgent.sinks.fileSink.channel = fileChannel
>
> # Each channel's type is defined.
> adServerAgent.channels.fileChannel.type=file
> adServerAgent.channels.fileChannel.dataDirs=/home/hadoop/flume_channel/dataDir15
> adServerAgent.channels.fileChannel.checkpointDir=/home/hadoop/flume_channel/checkpointDir15
> adServerAgent.channels.fileChannel.write-timeout=30
>
>
> Regards,
> Jagadish

Mime
View raw message