hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From java8964 java8964 <java8...@hotmail.com>
Subject RE: All datanodes are bad IOException when trying to implement multithreading serialization
Date Mon, 30 Sep 2013 21:35:06 GMT
Not exactly know what  you are trying to do, but it seems like the memory is your bottle neck,
and you think you have enough CPU resource, so you want to use multi-thread to utilize CPU
resources?
You can start multi-threads in your mapper, as if you think your mapper logic is very cpu
intensive, and want to make it faster by multi-threads. But reading next split in the current
mapper doesn't sounds like a good idea. why you want to do that?  What happen if that split
is being allocated to another mapper task?
If you have more CPU resources than your memory resource in the cluster, it just means your
cluster's resource is not well-balanced. If you cannot fix that in physical level, leave it
as is.
If you think it makes sense to use multi-thread in the mapper logic, go ahead using it, but
only consuming the current split. If you think the split is too small for the current mapper,
change your block size for the files for this kind of mapper. In HDFS, the block size is at
file level. You can set it be yourself.
Yong

From: zhangyunming1990@gmail.com
Date: Sun, 29 Sep 2013 21:12:40 -0500
Subject: Re: All datanodes are bad IOException when trying to implement multithreading serialization
To: user@hadoop.apache.org

Thanks Sonai, Felix, I have researched into combined file format before. 
The problem I am trying to solve here is that I want to reduce the number of mappers running
concurrently on a single node. Normally, on a machine with 8 GB of RAM and 8 Cores, I need
to run 8 JVMs(mapper) to exploit 8 core CPU resources. However, this limits the heap size
of each JVM(mapper) to 1 GB. I want to be able to use 2-4 JVMs (mappers) concurrently and
still use the 8 cores (this will allow me to set the heap size of each JVM to 2-4GB). The
additional heap memory is important for my application. This means that I use multithreading
within a mapper to use more than 1 core per JVM. 


The modifications I made was trying to have a single mapper read key,val pairs from the same
input split concurrently. As a result, I could process the input split using two or three
threads working on different portions of the input split. 


Sorry if I had not made this clear in the previous email. I have written my own implementation
of Mapper's run method to accomplish this, but I also need to have LineRecordReader reading
from the input split concurrently. That's why I modified the LineRecordReader in the way I
attached

 java.io.IOException: Unexpected checksum mismatch while writing blk_-8687605593694081588_1014
from /1\
92.168.102.99:40057

        at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.verifyChunks(BlockReceiver.java:221)
       at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:447)

        at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:532)
       at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:398)

        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:107)  
     at java.lang.Thread.run(Thread.java:619)

I suspect this might be related to threading for HDFS. May be I can't read a file in HDFS
in a multithreaded fashion (one thread from the beginning and another one from the middle
of the file for example)? 


Any suggestions?
Thanks a lot!
Yunming


On Sun, Sep 29, 2013 at 8:58 PM, Felix Chern <idryman@gmail.com> wrote:

The number of mappers usually is same as the number of the files you fed to it.
To reduce the number you can use CombineFileInputFormat.
I recently wrote an article about it. You can take a look if this fits your needs.
http://www.idryman.org/blog/2013/09/22/process-small-files-on-hadoop-using-combinefileinputformat-1/


Felix
On Sep 29, 2013, at 6:45 PM, yunming zhang <zhangyunming1990@gmail.com> wrote:


I am actually trying to reduce the number of mappers because my application takes up a lot
of memory (in the order of 1-2 GB ram per mapper).  I want to be able to use a few mappers
but still maintain good CPU utilization through multithreading within a single mapper. Multithreaded
Mapper does't work because it duplicates in memory data structures.




Thanks
Yunming

On Sun, Sep 29, 2013 at 6:59 PM, Sonal Goyal <sonalgoyal4@gmail.com> wrote:




Wouldn't you rather just change your split size so that you can have more mappers work on
your input? What else are you doing in the mappers?




Sent from my iPad
On Sep 30, 2013, at 2:22 AM, yunming zhang <zhangyunming1990@gmail.com> wrote:



Hi, 
I was playing with Hadoop code trying to have a single Mapper support reading a input split
using multiple threads. I am getting All datanodes are bad IOException, and I am not sure
what is the issue. 






The reason for this work is that I suspect my computation was slow because it takes too long
to create the Text() objects from inputsplit using a single thread. I tried to modify the
LineRecordReader (since I am mostly using TextInputFormat) to provide additional methods to
retrieve lines from the input split  getCurrentKey2(), getCurrentValue2(), nextKeyValue2().
I created a second FSDataInputStream, and second LineReader object for getCurrentKey2(), getCurrentValue2()
to read from. Essentially I am trying to open the input split twice with different start points
(one in the very beginning, the other in the middle of the split) to read from input split
in parallel using two threads.  






In the org.apache.hadoop.mapreduce.mapper.run() method, I modified it to read simultaneously
using getCurrentKey() and getCurrentKey2() using Thread 1 and Thread 2 (both threads running
at the same tim



      Thread 1:       while(context.nextKeyValue()){

                  map(context.getCurrentKey(), context.getCurrentValue(), context);      
 }
      Thread 2:        while(context.nextKeyValue2()){                map(context.getCurrentKey2(),
context.getCurrentValue2(), context);





                //System.out.println("two iter");        }
However, this causes me to see the All Datanodes are bad exception. I think I made sure that
I closed the second file. I have attached a copy of my LineRecordReader file to show what
I changed trying to enable two simultaneous read to the input split. 






I have modified other files(org.apache.hadoop.mapreduce.RecordReader.java, mapred.MapTask.java
....)  just to enable Mapper.run to call LinRecordReader.getCurrentKey2() and other access
methods for the second file. 







I would really appreciate it if anyone could give me a bit advice or just point me to a direction
as to where the problem might be, 


Thanks
Yunming 

<LineRecordReader.java>



 		 	   		  
Mime
View raw message