Return-Path: X-Original-To: apmail-hadoop-user-archive@minotaur.apache.org Delivered-To: apmail-hadoop-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 64A76DE9D for ; Tue, 18 Sep 2012 03:45:01 +0000 (UTC) Received: (qmail 65721 invoked by uid 500); 18 Sep 2012 03:44:55 -0000 Delivered-To: apmail-hadoop-user-archive@hadoop.apache.org Received: (qmail 65501 invoked by uid 500); 18 Sep 2012 03:44:55 -0000 Mailing-List: contact user-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@hadoop.apache.org Delivered-To: mailing list user@hadoop.apache.org Received: (qmail 65479 invoked by uid 99); 18 Sep 2012 03:44:54 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Sep 2012 03:44:54 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=FSL_RCVD_USER,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of lin.yang.jason@gmail.com designates 209.85.212.48 as permitted sender) Received: from [209.85.212.48] (HELO mail-vb0-f48.google.com) (209.85.212.48) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Sep 2012 03:44:49 +0000 Received: by vbme21 with SMTP id e21so8264257vbm.35 for ; Mon, 17 Sep 2012 20:44:28 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type; bh=llze5Z5i5XQxZMHfVtDzuRNUfG5G6302MgLiJtx+VCA=; b=KrcZ0qIeSETUKeRSyYh3PIy6FA7p5/HunYRJL4KZITuv7hyEA95aZzjFymnThDuUZq Kafo4b3v2NkjMOq/vkRvMMKOg5DpuCt3wywUwALZtlHH0zEbaFjz4mYwHa5gBHaPRfJZ VUKiiml5OqzVPVHWWG/zhQevuUcuYJET0ZSweCOXgsVBU/q6Pd4VASb4cjAJGLsX/8CB 2AYGz7eIr+Zn0dSvmEwaT2sfUySSLW4h0oU1xZvJMQRqvvKzA9FtKCS4zj5xbASkjSQz z8+9S952A8Kw25x1+BQs5vhKbN60FrEQrnUdlO+djvuQl04e3HYHCKwF+17yg3J+trB/ qS8Q== Received: by 10.52.26.137 with SMTP id l9mr4377968vdg.62.1347939868429; Mon, 17 Sep 2012 20:44:28 -0700 (PDT) MIME-Version: 1.0 Received: by 10.58.132.176 with HTTP; Mon, 17 Sep 2012 20:44:08 -0700 (PDT) In-Reply-To: References: From: Jason Yang Date: Tue, 18 Sep 2012 11:44:08 +0800 Message-ID: Subject: Re: IOException when using MultipleSequenceFileOutputFormat To: user@hadoop.apache.org Content-Type: multipart/alternative; boundary=20cf307cfd36094dc104c9f1b50d X-Virus-Checked: Checked by ClamAV on apache.org --20cf307cfd36094dc104c9f1b50d Content-Type: text/plain; charset=ISO-8859-1 Hey, Harsh Thanks for your reply. There are 20 data files as input and each of them would be clustered into 4 groups. And as I used the "DataFileName-groupNum" as output key, so it would be 80 unique keys in total. According to your suggestion, I have done two following tests: 1) Try a smaller input: I choose 5 files randomly as input, it always works fine! 2) Run it on fully-distributed cluster: it always works fine with 20 data files input in fully-distributed cluster, while It always fails on pseudo-distributed cluster. So, it seems to be related to the xceiver/load limits you mentioned, and I have changed the xceiver value in the hdfs-site.xml: dfs.datanode.max.xcievers 4096 but I still got the same error when running with 20 data files inputs in pseudo-distributed clusters. How could I fix this problem? 2012/9/18 Harsh J > Hi Jason, > > How many unique keys are you going to be generating from this program, > roughly? > > By default, the max-load of a DN is about 4k threads and if you're > trying to push beyond that value then the NN will no longer select the > DN as it would consider it already overloaded. In a fully distributed > mode, you may not see this issue as there's several DNs and TTs to > distribute the write load across. > > Try with a smaller input sample if there's a whole lot of keys you'll > be creating files for, and see if that works instead (such that > there's fewer files and you do not hit the xceiver/load limits). > > On Mon, Sep 17, 2012 at 7:20 PM, Jason Yang > wrote: > > Hi, all > > > > I have written a simple MR program which partition a file into multiple > > files bases on the clustering result of the points in this file, here is > my > > code: > > --- > > private int run() throws IOException > > { > > String scheme = getConf().get(CommonUtility.ATTR_SCHEME); > > String ecgDir = getConf().get(CommonUtility.ATTR_ECG_DATA_DIR); > > String outputDir = getConf().get(CommonUtility.ATTR_OUTPUT_DIR); > > > > // create JobConf > > JobConf jobConf = new JobConf(getConf(), this.getClass()); > > > > // set path for input and output > > Path inPath = new Path(scheme + ecgDir); > > Path outPath = new Path(scheme + outputDir + > > CommonUtility.OUTPUT_LOCAL_CLUSTERING); > > FileInputFormat.addInputPath(jobConf, inPath); > > FileOutputFormat.setOutputPath(jobConf, outPath); > > > > // clear output if it already existed > > CommonUtility.deleteHDFSFile(outPath.toString()); > > > > // set format for input and output > > jobConf.setInputFormat(WholeFileInputFormat.class); > > jobConf.setOutputFormat(LocalClusterMSFOutputFormat.class); > > > > // set class of output key and value > > jobConf.setOutputKeyClass(Text.class); > > jobConf.setOutputValueClass(RRIntervalWritable.class); > > > > // set mapper and reducer > > jobConf.setMapperClass(LocalClusteringMapper.class); > > jobConf.setReducerClass(IdentityReducer.class); > > > > > > // run the job > > JobClient.runJob(jobConf); > > return 0; > > } > > > > ... > > > > public class LocalClusteringMapper extends MapReduceBase implements > > Mapper > > { > > @Override > > public void map(NullWritable key, BytesWritable value, > > OutputCollector output, Reporter reporter) > > throws IOException > > { > > //read and cluster > > ... > > > > // output > > Iterator it = rrArray.iterator(); > > while (it.hasNext()) > > { > > RRIntervalWritable rr = it.next(); > > > > Text outputKey = new Text(rr.clusterResult ); > > > > output.collect(outputKey, rr); > > } > > > > } > > > > ... > > > > public class LocalClusterMSFOutputFormat extends > > MultipleSequenceFileOutputFormat > > { > > > > protected String generateFileNameForKeyValue(Text key, > > RRIntervalWritable value, String name) > > { > > return value.clusterResult.toString(); > > } > > } > > --- > > > > But this program always get a IO Exception when running in a > > pseudo-distributed cluster, and the log has been attached at the end of > this > > post. > > > > There's something wired: > > 1. If I use the SequenceFileOutputFormat instead of > > MultipleSequenceFileOutputFormat, this program would works fine( at least > > there is no error in log). > > 2. The one which always cause the error is the EcgData002509_LCF_3 > > > > > >> > >> 12/09/17 21:10:35 INFO mapred.MapTask: Starting flush of map output > >> 12/09/17 21:10:35 INFO mapred.MapTask: Finished spill 0 > >> 12/09/17 21:10:35 INFO mapred.TaskRunner: > >> Task:attempt_local_0001_m_000019_0 is done. And is in the process of > >> commiting > >> 12/09/17 21:10:35 INFO mapred.LocalJobRunner: > >> 12/09/17 21:10:35 INFO mapred.TaskRunner: Task > >> 'attempt_local_0001_m_000019_0' done. > >> 12/09/17 21:10:35 INFO mapred.LocalJobRunner: > >> 12/09/17 21:10:35 INFO mapred.Merger: Merging 20 sorted segments > >> 12/09/17 21:10:35 INFO mapred.Merger: Merging 2 intermediate segments > out > >> of a total of 20 > >> 12/09/17 21:10:35 INFO mapred.Merger: Merging 10 intermediate segments > out > >> of a total of 19 > >> 12/09/17 21:10:35 INFO mapred.Merger: Down to the last merge-pass, with > 10 > >> segments left of total size: 18913891 bytes > >> 12/09/17 21:10:35 INFO mapred.LocalJobRunner: > >> 12/09/17 21:10:39 WARN hdfs.DFSClient: DataStreamer Exception: > >> org.apache.hadoop.ipc.RemoteException: java.io.IOException: File > >> > /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3 > >> could only be replicated to 0 nodes, instead of 1 > >> at > >> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1271) > >> at > >> > org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:422) > >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > >> at > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > >> at java.lang.reflect.Method.invoke(Method.java:597) > >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508) > >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959) > >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955) > >> at java.security.AccessController.doPrivileged(Native Method) > >> at javax.security.auth.Subject.doAs(Subject.java:396) > >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953) > >> at org.apache.hadoop.ipc.Client.call(Client.java:740) > >> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220) > >> at $Proxy0.addBlock(Unknown Source) > >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > >> at > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > >> at java.lang.reflect.Method.invoke(Method.java:597) > >> at > >> > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) > >> at > >> > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) > >> at $Proxy0.addBlock(Unknown Source) > >> at > >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.locateFollowingBlock(DFSClient.java:2937) > >> at > >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2819) > >> at > >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:2102) > >> at > >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2288) > >> 12/09/17 21:10:39 WARN hdfs.DFSClient: Error Recovery for block null bad > >> datanode[0] nodes == null > >> 12/09/17 21:10:39 WARN hdfs.DFSClient: Could not get block locations. > >> Source file > >> > "/work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3" > >> - Aborting... > >> 12/09/17 21:10:41 INFO mapred.LocalJobRunner: reduce > reduce > >> 12/09/17 21:10:42 INFO mapred.JobClient: map 100% reduce 89% > >> 12/09/17 21:10:42 WARN mapred.LocalJobRunner: job_local_0001 > >> org.apache.hadoop.ipc.RemoteException: java.io.IOException: File > >> > /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3 > >> could only be replicated to 0 nodes, instead of 1 > >> at > >> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1271) > >> at > >> > org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:422) > >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > >> at > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > >> at java.lang.reflect.Method.invoke(Method.java:597) > >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508) > >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959) > >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955) > >> at java.security.AccessController.doPrivileged(Native Method) > >> at javax.security.auth.Subject.doAs(Subject.java:396) > >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953) > >> at org.apache.hadoop.ipc.Client.call(Client.java:740) > >> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220) > >> at $Proxy0.addBlock(Unknown Source) > >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > >> at > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > >> at java.lang.reflect.Method.invoke(Method.java:597) > >> at > >> > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) > >> at > >> > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) > >> at $Proxy0.addBlock(Unknown Source) > >> at > >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.locateFollowingBlock(DFSClient.java:2937) > >> at > >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2819) > >> at > >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:2102) > >> at > >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2288) > >> 12/09/17 21:10:43 INFO mapred.JobClient: Job complete: job_local_0001 > >> 12/09/17 21:10:43 INFO mapred.JobClient: Counters: 15 > >> 12/09/17 21:10:43 INFO mapred.JobClient: FileSystemCounters > >> 12/09/17 21:10:43 INFO mapred.JobClient: FILE_BYTES_READ=23297226 > >> 12/09/17 21:10:43 INFO mapred.JobClient: HDFS_BYTES_READ=546711709 > >> 12/09/17 21:10:43 INFO mapred.JobClient: > FILE_BYTES_WRITTEN=232075142 > >> 12/09/17 21:10:43 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=13530993 > >> 12/09/17 21:10:43 INFO mapred.JobClient: Map-Reduce Framework > >> 12/09/17 21:10:43 INFO mapred.JobClient: Reduce input groups=56 > >> 12/09/17 21:10:43 INFO mapred.JobClient: Combine output records=0 > >> 12/09/17 21:10:43 INFO mapred.JobClient: Map input records=20 > >> 12/09/17 21:10:43 INFO mapred.JobClient: Reduce shuffle bytes=0 > >> 12/09/17 21:10:43 INFO mapred.JobClient: Reduce output records=38837 > >> 12/09/17 21:10:43 INFO mapred.JobClient: Spilled Records=102562 > >> 12/09/17 21:10:43 INFO mapred.JobClient: Map output bytes=18691072 > >> 12/09/17 21:10:43 INFO mapred.JobClient: Map input bytes=28649088 > >> 12/09/17 21:10:43 INFO mapred.JobClient: Combine input records=0 > >> 12/09/17 21:10:43 INFO mapred.JobClient: Map output records=55700 > >> 12/09/17 21:10:43 INFO mapred.JobClient: Reduce input records=38838 > >> 12/09/17 21:10:44 INFO mapred.LocalJobRunner: reduce > reduce > >> Exception in thread "main" java.io.IOException: Job failed! > >> at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1252) > >> at org.yanglin.mr.lab.ecg.PESCJob.runLocalClustering(PESCJob.java:111) > >> at org.yanglin.mr.lab.ecg.PESCJob.run(PESCJob.java:57) > >> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65) > >> at org.yanglin.mr.lab.ecg.PESCJob.main(PESCJob.java:117) > >> 12/09/17 21:10:48 ERROR hdfs.DFSClient: Exception closing file > >> > /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3 > >> : org.apache.hadoop.ipc.RemoteException: java.io.IOException: File > >> > /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3 > >> could only be replicated to 0 nodes, instead of 1 > >> at > >> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1271) > >> at > >> > org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:422) > >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > >> at > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > >> at java.lang.reflect.Method.invoke(Method.java:597) > >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508) > >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959) > >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955) > >> at java.security.AccessController.doPrivileged(Native Method) > >> at javax.security.auth.Subject.doAs(Subject.java:396) > >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953) > >> org.apache.hadoop.ipc.RemoteException: java.io.IOException: File > >> > /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3 > >> could only be replicated to 0 nodes, instead of 1 > >> at > >> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1271) > >> at > >> > org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:422) > >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > >> at > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > >> at java.lang.reflect.Method.invoke(Method.java:597) > >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508) > >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959) > >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955) > >> at java.security.AccessController.doPrivileged(Native Method) > >> at javax.security.auth.Subject.doAs(Subject.java:396) > >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953) > >> at org.apache.hadoop.ipc.Client.call(Client.java:740) > >> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220) > >> at $Proxy0.addBlock(Unknown Source) > >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > >> at > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > >> at java.lang.reflect.Method.invoke(Method.java:597) > >> at > >> > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) > >> at > >> > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) > >> at $Proxy0.addBlock(Unknown Source) > >> at > >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.locateFollowingBlock(DFSClient.java:2937) > >> at > >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2819) > >> at > >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:2102) > >> at > >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2288) > > > > > > > > > > -- > > YANG, Lin > > > > > > -- > Harsh J > -- YANG, Lin --20cf307cfd36094dc104c9f1b50d Content-Type: text/html; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable Hey, Harsh

Thanks for your reply.

There are 20 data files as input and each of them would be clustered int= o 4 groups. And as I used the "DataFileName-groupNum" as output k= ey, so it would be 80 unique keys in total.=A0

According to your suggestion, I have done two following= tests:

1) Try a smaller input: I choose 5 files r= andomly as input, it always works fine!=A0
2) Run it on fully-dis= tributed cluster: it always works fine with 20 data files input in fully-di= stributed cluster, while It always fails on pseudo-distributed cluster.

So, it seems to be related to the=A0xceiver/load limits= you mentioned, and I have changed the xceiver value in the hdfs-site.xml:<= /div>


<property>
        <name>dfs.datanode.max.xcievers</name>
        <value>4096</value>
</property>

but=A0I still got the same= error when running with 20 data files inputs in pseudo-distributed cluster= s.

How could I fix this problem?

2012/9/18 Harsh J &= lt;harsh@cloudera.c= om>
Hi Jason,

How many unique keys are you going to be generating from this program, roug= hly?

By default, the max-load of a DN is about 4k threads and if you're
trying to push beyond that value then the NN will no longer select the
DN as it would consider it already overloaded. In a fully distributed
mode, you may not see this issue as there's several DNs and TTs to
distribute the write load across.

Try with a smaller input sample if there's a whole lot of keys you'= ll
be creating files for, and see if that works instead (such that
there's fewer files and you do not hit the xceiver/load limits).

On Mon, Sep 17, 2012 at 7:20 PM, Jason Yang <lin.yang.jason@gmail.com> wrote:
> Hi, all
>
> I have written a simple MR program which partition a file into multipl= e
> files bases on the clustering result of the points in this file, here = is my
> code:
> ---
> private int run() throws IOException
> {
> String scheme =3D getConf().get(CommonUtility.ATTR_SCHEME);
> String ecgDir =3D getConf().get(CommonUtility.ATTR_ECG_DATA_DIR);
> String outputDir =3D getConf().get(CommonUtility.ATTR_OUTPUT_DIR);
>
> // create JobConf
> JobConf jobConf =3D new JobConf(getConf(), this.getClass());
>
> // set path for input and output
> Path inPath =3D new Path(scheme + ecgDir);
> Path outPath =3D new Path(scheme + outputDir +
> CommonUtility.OUTPUT_LOCAL_CLUSTERING);
> FileInputFormat.addInputPath(jobConf, inPath);
> FileOutputFormat.setOutputPath(jobConf, outPath);
>
> // clear output if it already existed
> CommonUtility.deleteHDFSFile(outPath.toString());
>
> // set format for input and output
> jobConf.setInputFormat(WholeFileInputFormat.class);
> jobConf.setOutputFormat(LocalClusterMSFOutputFormat.class);
>
> // set class of output key and value
> jobConf.setOutputKeyClass(Text.class);
> jobConf.setOutputValueClass(RRIntervalWritable.class);
>
> // set mapper and reducer
> jobConf.setMapperClass(LocalClusteringMapper.class);
> jobConf.setReducerClass(IdentityReducer.class);
>
>
> // run the job
> JobClient.runJob(jobConf);
> return 0;
> }
>
> ...
>
> public class LocalClusteringMapper extends MapReduceBase implements > Mapper<NullWritable, BytesWritable, Text, RRIntervalWritable> > {
> @Override
> public void map(NullWritable key, BytesWritable value,
> OutputCollector<Text, RRIntervalWritable> output, Reporter repor= ter)
> throws IOException
> {
> //read and cluster
> =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 ...
>
> // output
> Iterator<RRIntervalWritable> it =3D rrArray.iterator();
> while (it.hasNext())
> {
> RRIntervalWritable rr =3D it.next();
>
> Text outputKey =3D new Text(rr.clusterResult );
>
> output.collect(outputKey, rr);
> }
>
> }
>
> ...
>
> public class LocalClusterMSFOutputFormat extends
> MultipleSequenceFileOutputFormat<Text, RRIntervalWritable>
> {
>
> protected String generateFileNameForKeyValue(Text key,
> RRIntervalWritable value, String name)
> {
> return value.clusterResult.toString();
> }
> }
> ---
>
> But this program always get a IO Exception when running in a
> pseudo-distributed cluster, and the log has been attached at the end o= f this
> post.
>
> There's something wired:
> 1. If I use the SequenceFileOutputFormat instead of
> MultipleSequenceFileOutputFormat, this program would works fine( at le= ast
> there is no error in log).
> 2. The one which always cause the error is the EcgData002509_LCF_3
>
>
>>
>> 12/09/17 21:10:35 INFO mapred.MapTask: Starting flush of map outpu= t
>> 12/09/17 21:10:35 INFO mapred.MapTask: Finished spill 0
>> 12/09/17 21:10:35 INFO mapred.TaskRunner:
>> Task:attempt_local_0001_m_000019_0 is done. And is in the process = of
>> commiting
>> 12/09/17 21:10:35 INFO mapred.LocalJobRunner:
>> 12/09/17 21:10:35 INFO mapred.TaskRunner: Task
>> 'attempt_local_0001_m_000019_0' done.
>> 12/09/17 21:10:35 INFO mapred.LocalJobRunner:
>> 12/09/17 21:10:35 INFO mapred.Merger: Merging 20 sorted segments >> 12/09/17 21:10:35 INFO mapred.Merger: Merging 2 intermediate segme= nts out
>> of a total of 20
>> 12/09/17 21:10:35 INFO mapred.Merger: Merging 10 intermediate segm= ents out
>> of a total of 19
>> 12/09/17 21:10:35 INFO mapred.Merger: Down to the last merge-pass,= with 10
>> segments left of total size: 18913891 bytes
>> 12/09/17 21:10:35 INFO mapred.LocalJobRunner:
>> 12/09/17 21:10:39 WARN hdfs.DFSClient: DataStreamer Exception:
>> org.apache.hadoop.ipc.RemoteException: java.io.IOException: File >> /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_= 000000_0/EcgData002509_LCF_3
>> could only be replicated to 0 nodes, instead of 1
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalB= lock(FSNamesystem.java:1271)
>> at
>> org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.= java:422)
>> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAc= cessorImpl.java:25)
>> at java.lang.reflect.Method.invoke(Method.java:597)
>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:396)
>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953)
>> at org.apache.hadoop.ipc.Client.call(Client.java:740)
>> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220)
>> at $Proxy0.addBlock(Unknown Source)
>> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAc= cessorImpl.java:25)
>> at java.lang.reflect.Method.invoke(Method.java:597)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(Ret= ryInvocationHandler.java:82)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvo= cationHandler.java:59)
>> at $Proxy0.addBlock(Unknown Source)
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.locateFollowingBl= ock(DFSClient.java:2937)
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputSt= ream(DFSClient.java:2819)
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSCl= ient.java:2102)
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(= DFSClient.java:2288)
>> 12/09/17 21:10:39 WARN hdfs.DFSClient: Error Recovery for block nu= ll bad
>> datanode[0] nodes =3D=3D null
>> 12/09/17 21:10:39 WARN hdfs.DFSClient: Could not get block locatio= ns.
>> Source file
>> "/work/lab/output/localClustering/_temporary/_attempt_local_0= 001_r_000000_0/EcgData002509_LCF_3"
>> - Aborting...
>> 12/09/17 21:10:41 INFO mapred.LocalJobRunner: reduce > reduce >> 12/09/17 21:10:42 INFO mapred.JobClient: =A0map 100% reduce 89% >> 12/09/17 21:10:42 WARN mapred.LocalJobRunner: job_local_0001
>> org.apache.hadoop.ipc.RemoteException: java.io.IOException: File >> /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_= 000000_0/EcgData002509_LCF_3
>> could only be replicated to 0 nodes, instead of 1
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalB= lock(FSNamesystem.java:1271)
>> at
>> org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.= java:422)
>> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAc= cessorImpl.java:25)
>> at java.lang.reflect.Method.invoke(Method.java:597)
>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:396)
>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953)
>> at org.apache.hadoop.ipc.Client.call(Client.java:740)
>> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220)
>> at $Proxy0.addBlock(Unknown Source)
>> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAc= cessorImpl.java:25)
>> at java.lang.reflect.Method.invoke(Method.java:597)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(Ret= ryInvocationHandler.java:82)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvo= cationHandler.java:59)
>> at $Proxy0.addBlock(Unknown Source)
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.locateFollowingBl= ock(DFSClient.java:2937)
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputSt= ream(DFSClient.java:2819)
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSCl= ient.java:2102)
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(= DFSClient.java:2288)
>> 12/09/17 21:10:43 INFO mapred.JobClient: Job complete: job_local_0= 001
>> 12/09/17 21:10:43 INFO mapred.JobClient: Counters: 15
>> 12/09/17 21:10:43 INFO mapred.JobClient: =A0 FileSystemCounters >> 12/09/17 21:10:43 INFO mapred.JobClient: =A0 =A0 FILE_BYTES_READ= =3D23297226
>> 12/09/17 21:10:43 INFO mapred.JobClient: =A0 =A0 HDFS_BYTES_READ= =3D546711709
>> 12/09/17 21:10:43 INFO mapred.JobClient: =A0 =A0 FILE_BYTES_WRITTE= N=3D232075142
>> 12/09/17 21:10:43 INFO mapred.JobClient: =A0 =A0 HDFS_BYTES_WRITTE= N=3D13530993
>> 12/09/17 21:10:43 INFO mapred.JobClient: =A0 Map-Reduce Framework<= br> >> 12/09/17 21:10:43 INFO mapred.JobClient: =A0 =A0 Reduce input grou= ps=3D56
>> 12/09/17 21:10:43 INFO mapred.JobClient: =A0 =A0 Combine output re= cords=3D0
>> 12/09/17 21:10:43 INFO mapred.JobClient: =A0 =A0 Map input records= =3D20
>> 12/09/17 21:10:43 INFO mapred.JobClient: =A0 =A0 Reduce shuffle by= tes=3D0
>> 12/09/17 21:10:43 INFO mapred.JobClient: =A0 =A0 Reduce output rec= ords=3D38837
>> 12/09/17 21:10:43 INFO mapred.JobClient: =A0 =A0 Spilled Records= =3D102562
>> 12/09/17 21:10:43 INFO mapred.JobClient: =A0 =A0 Map output bytes= =3D18691072
>> 12/09/17 21:10:43 INFO mapred.JobClient: =A0 =A0 Map input bytes= =3D28649088
>> 12/09/17 21:10:43 INFO mapred.JobClient: =A0 =A0 Combine input rec= ords=3D0
>> 12/09/17 21:10:43 INFO mapred.JobClient: =A0 =A0 Map output record= s=3D55700
>> 12/09/17 21:10:43 INFO mapred.JobClient: =A0 =A0 Reduce input reco= rds=3D38838
>> 12/09/17 21:10:44 INFO mapred.LocalJobRunner: reduce > reduce >> Exception in thread "main" java.io.IOException: Job fail= ed!
>> at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1252)<= br> >> at org.yanglin.mr.lab.ecg.PESCJob.runLocalClustering(PESCJob.java:= 111)
>> at org.yanglin.mr.lab.ecg.PESCJob.run(PESCJob.java:57)
>> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
>> at org.yanglin.mr.lab.ecg.PESCJob.main(PESCJob.java:117)
>> 12/09/17 21:10:48 ERROR hdfs.DFSClient: Exception closing file
>> /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_= 000000_0/EcgData002509_LCF_3
>> : org.apache.hadoop.ipc.RemoteException: java.io.IOException: File=
>> /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_= 000000_0/EcgData002509_LCF_3
>> could only be replicated to 0 nodes, instead of 1
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalB= lock(FSNamesystem.java:1271)
>> at
>> org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.= java:422)
>> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAc= cessorImpl.java:25)
>> at java.lang.reflect.Method.invoke(Method.java:597)
>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:396)
>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953)
>> org.apache.hadoop.ipc.RemoteException: java.io.IOException: File >> /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_= 000000_0/EcgData002509_LCF_3
>> could only be replicated to 0 nodes, instead of 1
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalB= lock(FSNamesystem.java:1271)
>> at
>> org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.= java:422)
>> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAc= cessorImpl.java:25)
>> at java.lang.reflect.Method.invoke(Method.java:597)
>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:396)
>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953)
>> at org.apache.hadoop.ipc.Client.call(Client.java:740)
>> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220)
>> at $Proxy0.addBlock(Unknown Source)
>> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAc= cessorImpl.java:25)
>> at java.lang.reflect.Method.invoke(Method.java:597)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(Ret= ryInvocationHandler.java:82)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvo= cationHandler.java:59)
>> at $Proxy0.addBlock(Unknown Source)
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.locateFollowingBl= ock(DFSClient.java:2937)
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputSt= ream(DFSClient.java:2819)
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSCl= ient.java:2102)
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(= DFSClient.java:2288)
>
>
>
>
> --
> YANG, Lin
>



--
Harsh J



--
YANG, Lin

--20cf307cfd36094dc104c9f1b50d--