hadoop-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Harsh J <ha...@cloudera.com>
Subject Re: IOException when using MultipleSequenceFileOutputFormat
Date Tue, 18 Sep 2012 03:59:39 GMT
Jason,

Perhaps then go with Jay's lead here: ulimits (over nproc and nofile
mostly). Can you check if they are adequately high for opening several
blocks+sockets, for the user that runs the DataNode and for the user
that runs the TaskTracker (if insecure mode)?

On Tue, Sep 18, 2012 at 9:14 AM, Jason Yang <lin.yang.jason@gmail.com> wrote:
> Hey, Harsh
>
> Thanks for your reply.
>
> There are 20 data files as input and each of them would be clustered into 4
> groups. And as I used the "DataFileName-groupNum" as output key, so it would
> be 80 unique keys in total.
>
> According to your suggestion, I have done two following tests:
>
> 1) Try a smaller input: I choose 5 files randomly as input, it always works
> fine!
> 2) Run it on fully-distributed cluster: it always works fine with 20 data
> files input in fully-distributed cluster, while It always fails on
> pseudo-distributed cluster.
>
> So, it seems to be related to the xceiver/load limits you mentioned, and I
> have changed the xceiver value in the hdfs-site.xml:
>
>
> <property>
>         <name>dfs.datanode.max.xcievers</name>
>         <value>4096</value>
> </property>
>
>
> but I still got the same error when running with 20 data files inputs in
> pseudo-distributed clusters.
>
> How could I fix this problem?
>
> 2012/9/18 Harsh J <harsh@cloudera.com>
>>
>> Hi Jason,
>>
>> How many unique keys are you going to be generating from this program,
>> roughly?
>>
>> By default, the max-load of a DN is about 4k threads and if you're
>> trying to push beyond that value then the NN will no longer select the
>> DN as it would consider it already overloaded. In a fully distributed
>> mode, you may not see this issue as there's several DNs and TTs to
>> distribute the write load across.
>>
>> Try with a smaller input sample if there's a whole lot of keys you'll
>> be creating files for, and see if that works instead (such that
>> there's fewer files and you do not hit the xceiver/load limits).
>>
>> On Mon, Sep 17, 2012 at 7:20 PM, Jason Yang <lin.yang.jason@gmail.com>
>> wrote:
>> > Hi, all
>> >
>> > I have written a simple MR program which partition a file into multiple
>> > files bases on the clustering result of the points in this file, here is
>> > my
>> > code:
>> > ---
>> > private int run() throws IOException
>> > {
>> > String scheme = getConf().get(CommonUtility.ATTR_SCHEME);
>> > String ecgDir = getConf().get(CommonUtility.ATTR_ECG_DATA_DIR);
>> > String outputDir = getConf().get(CommonUtility.ATTR_OUTPUT_DIR);
>> >
>> > // create JobConf
>> > JobConf jobConf = new JobConf(getConf(), this.getClass());
>> >
>> > // set path for input and output
>> > Path inPath = new Path(scheme + ecgDir);
>> > Path outPath = new Path(scheme + outputDir +
>> > CommonUtility.OUTPUT_LOCAL_CLUSTERING);
>> > FileInputFormat.addInputPath(jobConf, inPath);
>> > FileOutputFormat.setOutputPath(jobConf, outPath);
>> >
>> > // clear output if it already existed
>> > CommonUtility.deleteHDFSFile(outPath.toString());
>> >
>> > // set format for input and output
>> > jobConf.setInputFormat(WholeFileInputFormat.class);
>> > jobConf.setOutputFormat(LocalClusterMSFOutputFormat.class);
>> >
>> > // set class of output key and value
>> > jobConf.setOutputKeyClass(Text.class);
>> > jobConf.setOutputValueClass(RRIntervalWritable.class);
>> >
>> > // set mapper and reducer
>> > jobConf.setMapperClass(LocalClusteringMapper.class);
>> > jobConf.setReducerClass(IdentityReducer.class);
>> >
>> >
>> > // run the job
>> > JobClient.runJob(jobConf);
>> > return 0;
>> > }
>> >
>> > ...
>> >
>> > public class LocalClusteringMapper extends MapReduceBase implements
>> > Mapper<NullWritable, BytesWritable, Text, RRIntervalWritable>
>> > {
>> > @Override
>> > public void map(NullWritable key, BytesWritable value,
>> > OutputCollector<Text, RRIntervalWritable> output, Reporter reporter)
>> > throws IOException
>> > {
>> > //read and cluster
>> >                   ...
>> >
>> > // output
>> > Iterator<RRIntervalWritable> it = rrArray.iterator();
>> > while (it.hasNext())
>> > {
>> > RRIntervalWritable rr = it.next();
>> >
>> > Text outputKey = new Text(rr.clusterResult );
>> >
>> > output.collect(outputKey, rr);
>> > }
>> >
>> > }
>> >
>> > ...
>> >
>> > public class LocalClusterMSFOutputFormat extends
>> > MultipleSequenceFileOutputFormat<Text, RRIntervalWritable>
>> > {
>> >
>> > protected String generateFileNameForKeyValue(Text key,
>> > RRIntervalWritable value, String name)
>> > {
>> > return value.clusterResult.toString();
>> > }
>> > }
>> > ---
>> >
>> > But this program always get a IO Exception when running in a
>> > pseudo-distributed cluster, and the log has been attached at the end of
>> > this
>> > post.
>> >
>> > There's something wired:
>> > 1. If I use the SequenceFileOutputFormat instead of
>> > MultipleSequenceFileOutputFormat, this program would works fine( at
>> > least
>> > there is no error in log).
>> > 2. The one which always cause the error is the EcgData002509_LCF_3
>> >
>> >
>> >>
>> >> 12/09/17 21:10:35 INFO mapred.MapTask: Starting flush of map output
>> >> 12/09/17 21:10:35 INFO mapred.MapTask: Finished spill 0
>> >> 12/09/17 21:10:35 INFO mapred.TaskRunner:
>> >> Task:attempt_local_0001_m_000019_0 is done. And is in the process of
>> >> commiting
>> >> 12/09/17 21:10:35 INFO mapred.LocalJobRunner:
>> >> 12/09/17 21:10:35 INFO mapred.TaskRunner: Task
>> >> 'attempt_local_0001_m_000019_0' done.
>> >> 12/09/17 21:10:35 INFO mapred.LocalJobRunner:
>> >> 12/09/17 21:10:35 INFO mapred.Merger: Merging 20 sorted segments
>> >> 12/09/17 21:10:35 INFO mapred.Merger: Merging 2 intermediate segments
>> >> out
>> >> of a total of 20
>> >> 12/09/17 21:10:35 INFO mapred.Merger: Merging 10 intermediate segments
>> >> out
>> >> of a total of 19
>> >> 12/09/17 21:10:35 INFO mapred.Merger: Down to the last merge-pass, with
>> >> 10
>> >> segments left of total size: 18913891 bytes
>> >> 12/09/17 21:10:35 INFO mapred.LocalJobRunner:
>> >> 12/09/17 21:10:39 WARN hdfs.DFSClient: DataStreamer Exception:
>> >> org.apache.hadoop.ipc.RemoteException: java.io.IOException: File
>> >>
>> >> /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3
>> >> could only be replicated to 0 nodes, instead of 1
>> >> at
>> >>
>> >> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1271)
>> >> at
>> >>
>> >> org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:422)
>> >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>> >> at
>> >>
>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>> >> at java.lang.reflect.Method.invoke(Method.java:597)
>> >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508)
>> >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959)
>> >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955)
>> >> at java.security.AccessController.doPrivileged(Native Method)
>> >> at javax.security.auth.Subject.doAs(Subject.java:396)
>> >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953)
>> >> at org.apache.hadoop.ipc.Client.call(Client.java:740)
>> >> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220)
>> >> at $Proxy0.addBlock(Unknown Source)
>> >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>> >> at
>> >>
>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>> >> at java.lang.reflect.Method.invoke(Method.java:597)
>> >> at
>> >>
>> >> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
>> >> at
>> >>
>> >> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
>> >> at $Proxy0.addBlock(Unknown Source)
>> >> at
>> >>
>> >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.locateFollowingBlock(DFSClient.java:2937)
>> >> at
>> >>
>> >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2819)
>> >> at
>> >>
>> >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:2102)
>> >> at
>> >>
>> >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2288)
>> >> 12/09/17 21:10:39 WARN hdfs.DFSClient: Error Recovery for block null
>> >> bad
>> >> datanode[0] nodes == null
>> >> 12/09/17 21:10:39 WARN hdfs.DFSClient: Could not get block locations.
>> >> Source file
>> >>
>> >> "/work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3"
>> >> - Aborting...
>> >> 12/09/17 21:10:41 INFO mapred.LocalJobRunner: reduce > reduce
>> >> 12/09/17 21:10:42 INFO mapred.JobClient:  map 100% reduce 89%
>> >> 12/09/17 21:10:42 WARN mapred.LocalJobRunner: job_local_0001
>> >> org.apache.hadoop.ipc.RemoteException: java.io.IOException: File
>> >>
>> >> /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3
>> >> could only be replicated to 0 nodes, instead of 1
>> >> at
>> >>
>> >> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1271)
>> >> at
>> >>
>> >> org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:422)
>> >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>> >> at
>> >>
>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>> >> at java.lang.reflect.Method.invoke(Method.java:597)
>> >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508)
>> >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959)
>> >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955)
>> >> at java.security.AccessController.doPrivileged(Native Method)
>> >> at javax.security.auth.Subject.doAs(Subject.java:396)
>> >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953)
>> >> at org.apache.hadoop.ipc.Client.call(Client.java:740)
>> >> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220)
>> >> at $Proxy0.addBlock(Unknown Source)
>> >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>> >> at
>> >>
>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>> >> at java.lang.reflect.Method.invoke(Method.java:597)
>> >> at
>> >>
>> >> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
>> >> at
>> >>
>> >> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
>> >> at $Proxy0.addBlock(Unknown Source)
>> >> at
>> >>
>> >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.locateFollowingBlock(DFSClient.java:2937)
>> >> at
>> >>
>> >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2819)
>> >> at
>> >>
>> >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:2102)
>> >> at
>> >>
>> >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2288)
>> >> 12/09/17 21:10:43 INFO mapred.JobClient: Job complete: job_local_0001
>> >> 12/09/17 21:10:43 INFO mapred.JobClient: Counters: 15
>> >> 12/09/17 21:10:43 INFO mapred.JobClient:   FileSystemCounters
>> >> 12/09/17 21:10:43 INFO mapred.JobClient:     FILE_BYTES_READ=23297226
>> >> 12/09/17 21:10:43 INFO mapred.JobClient:     HDFS_BYTES_READ=546711709
>> >> 12/09/17 21:10:43 INFO mapred.JobClient:
>> >> FILE_BYTES_WRITTEN=232075142
>> >> 12/09/17 21:10:43 INFO mapred.JobClient:
>> >> HDFS_BYTES_WRITTEN=13530993
>> >> 12/09/17 21:10:43 INFO mapred.JobClient:   Map-Reduce Framework
>> >> 12/09/17 21:10:43 INFO mapred.JobClient:     Reduce input groups=56
>> >> 12/09/17 21:10:43 INFO mapred.JobClient:     Combine output records=0
>> >> 12/09/17 21:10:43 INFO mapred.JobClient:     Map input records=20
>> >> 12/09/17 21:10:43 INFO mapred.JobClient:     Reduce shuffle bytes=0
>> >> 12/09/17 21:10:43 INFO mapred.JobClient:     Reduce output
>> >> records=38837
>> >> 12/09/17 21:10:43 INFO mapred.JobClient:     Spilled Records=102562
>> >> 12/09/17 21:10:43 INFO mapred.JobClient:     Map output bytes=18691072
>> >> 12/09/17 21:10:43 INFO mapred.JobClient:     Map input bytes=28649088
>> >> 12/09/17 21:10:43 INFO mapred.JobClient:     Combine input records=0
>> >> 12/09/17 21:10:43 INFO mapred.JobClient:     Map output records=55700
>> >> 12/09/17 21:10:43 INFO mapred.JobClient:     Reduce input records=38838
>> >> 12/09/17 21:10:44 INFO mapred.LocalJobRunner: reduce > reduce
>> >> Exception in thread "main" java.io.IOException: Job failed!
>> >> at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1252)
>> >> at org.yanglin.mr.lab.ecg.PESCJob.runLocalClustering(PESCJob.java:111)
>> >> at org.yanglin.mr.lab.ecg.PESCJob.run(PESCJob.java:57)
>> >> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
>> >> at org.yanglin.mr.lab.ecg.PESCJob.main(PESCJob.java:117)
>> >> 12/09/17 21:10:48 ERROR hdfs.DFSClient: Exception closing file
>> >>
>> >> /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3
>> >> : org.apache.hadoop.ipc.RemoteException: java.io.IOException: File
>> >>
>> >> /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3
>> >> could only be replicated to 0 nodes, instead of 1
>> >> at
>> >>
>> >> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1271)
>> >> at
>> >>
>> >> org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:422)
>> >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>> >> at
>> >>
>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>> >> at java.lang.reflect.Method.invoke(Method.java:597)
>> >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508)
>> >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959)
>> >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955)
>> >> at java.security.AccessController.doPrivileged(Native Method)
>> >> at javax.security.auth.Subject.doAs(Subject.java:396)
>> >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953)
>> >> org.apache.hadoop.ipc.RemoteException: java.io.IOException: File
>> >>
>> >> /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3
>> >> could only be replicated to 0 nodes, instead of 1
>> >> at
>> >>
>> >> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1271)
>> >> at
>> >>
>> >> org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:422)
>> >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>> >> at
>> >>
>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>> >> at java.lang.reflect.Method.invoke(Method.java:597)
>> >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508)
>> >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959)
>> >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955)
>> >> at java.security.AccessController.doPrivileged(Native Method)
>> >> at javax.security.auth.Subject.doAs(Subject.java:396)
>> >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953)
>> >> at org.apache.hadoop.ipc.Client.call(Client.java:740)
>> >> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220)
>> >> at $Proxy0.addBlock(Unknown Source)
>> >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>> >> at
>> >>
>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>> >> at java.lang.reflect.Method.invoke(Method.java:597)
>> >> at
>> >>
>> >> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
>> >> at
>> >>
>> >> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
>> >> at $Proxy0.addBlock(Unknown Source)
>> >> at
>> >>
>> >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.locateFollowingBlock(DFSClient.java:2937)
>> >> at
>> >>
>> >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2819)
>> >> at
>> >>
>> >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:2102)
>> >> at
>> >>
>> >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2288)
>> >
>> >
>> >
>> >
>> > --
>> > YANG, Lin
>> >
>>
>>
>>
>> --
>> Harsh J
>
>
>
>
> --
> YANG, Lin
>



-- 
Harsh J

Mime
View raw message