hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tu, Tiankai" <Tiankai...@DEShawResearch.com>
Subject RE: Hadoop/HDFS for scientific simulation output data analysis
Date Fri, 03 Apr 2009 20:41:59 GMT
Thanks for the update and suggestion, Matei. 

I can certainly construct an input text file containing all the files of
a dataset
(http://hadoop.apache.org/core/docs/r0.19.1/streaming.html#How+do+I+proc
ess+files%2C+one+per+map%3F), then let the jobtracker dispatch the file
names to the maps, and open the files directly from within the map
method. But the jobtracker merely treats the file names as text input
and does not make an effort to assign a file (name) to the nodes that
store the file. As a result, a node opening a file is almost certain to
request data from a different data node---which destroys IO locality
(the very strength of Hadoop) and results in worse performance. (I had
verified such behavior earlier using Hadoop streaming.)

By the way, what is the largest size---in terms of total bytes, number
of files, and number of nodes---in your applications? Thanks.


-----Original Message-----
From: Matei Zaharia [mailto:matei@cloudera.com] 
Sent: Friday, April 03, 2009 1:18 PM
To: core-user@hadoop.apache.org
Subject: Re: Hadoop/HDFS for scientific simulation output data analysis

Hadoop does checksums for each small chunk of the file (512 bytes by
default) and stores a list of checksums for each chunk with the file,
rather
than storing just one checksum at the end. This makes it easier to read
only
a part of a file and know that it's not corrupt without having to read
and
checksum the whole file. It also lets you use smaller / simpler
checksums
for each chunk, making them more efficient to compute than the larger
checksum that would be needed to provide the same level of safety for
the
whole file.

The default buffer size is confusingly not 64 KB, it's 4 KB. It really
is
bad for performance as you saw. But I'd recommend trying 64 or 128 KB
before
jumping to 64 MB. 128K is also the setting Yahoo used in its 2000-node
performance tests (see http://wiki.apache.org/hadoop/FAQ).

The reason big buffers may impair cache locality is that CPU caches are
typically a few MB. If you set your checksum size and buffer size to 64
MB,
then whenever you read a block, the CPU first has to checksum 64 MB
worth of
data then start again at the beginning to read it and pass it through
your
application. During the checksumming process, the first pages of data
fell
out of CPU cache as you checksummed the later ones. Therefore, you have
to
read them from memory again during the second scan. If you just had a 64
KB
block, it would stay in cache since the first time you read it. The same
issue happens if instead of checksumming you were copying from one
buffer to
another (which does happen at a few places in Hadoop, and they tend to
use
io.file.buffer.size). So while I haven't tried measuring performance
with 64
MB vs 128 KB, I wouldn't be surprised if it leads to bad behavior,
because
it's much higher than what anyone runs in production.

Finally, if you just want to sequentially process a file on each node
and
you only want one logical "input record" per map, it might be better not
to
use an input format that reads the record into memory at all. Instead,
you
can have the map directly open the file, and have your InputFormat just
locate the map on the right node. This avoids copying the whole file
into
memory before streaming it through your mapper. If your algorithm does
require random access throughout the file on the other hand, you do need
to
read it all in. I think the WholeFileRecordReader in the FAQ is aimed at
smaller files than 256 MB / 1 GB.

On Fri, Apr 3, 2009 at 9:37 AM, Tu, Tiankai
<Tiankai.Tu@deshawresearch.com>wrote:

> Thanks for the comments, Matei.
>
> The machines I ran the experiments have 16 GB memory each. I don't see
> how 64 MB buffer could be huge or is bad for memory consumption. In
> fact, I set it to much larger value after initial rounds of tests
showed
> abysmal results using the default 64 KB buffer. Also, why is it
> necessary to compute checksum for every 512 bytes why only an
end-to-end
> (whole file) checksum makes sense? I set it to a much larger value to
> avoid the overhead.
>
> I didn't quite understand what you meant by bad for cache locality.
The
> jobs were IO bound in the first place. Any cache effect came
second---at
> least an order of magnitude negligible. Can you clarify which
particular
> computation (maybe within Hadoop) that was made slow because of a
large
> io buffer size?
>
> What bothered you was exactly what bothered me and prompted me to ask
> the question why the job tracker reported much more bytes read by the
> map task. I can confirm that the experiments were set up correctly. In
> fact, the numbers of map tasks were correctly reported by the job
> tracker. There were 1600 for the 1GB file dataset, 6400 for the 256MB
> file dataset, and so forth.
>
> Tiankai
>
>
>
> -----Original Message-----
> From: Matei Zaharia [mailto:matei@cloudera.com]
> Sent: Friday, April 03, 2009 11:21 AM
> To: core-user@hadoop.apache.org
> Subject: Re: Hadoop/HDFS for scientific simulation output data
analysis
>
> Hi Tiankai,
>
> The one strange thing I see in your configuration as described is IO
> buffer
> size and IO bytes per checksum set to 64 MB. This is much higher than
> the
> recommended defaults, which are about 64 KB for buffer size and 1 KB
or
> 512
> bytes for checksum. (Actually I haven't seen anyone change checksum
from
> its
> default of 512 bytes). Having huge buffers is bad for memory
consumption
> and
> cache locality.
>
> The other thing that bothers me is that on your 64 MB data set, you
have
> 28
> TB of HDFS bytes read. This is off from number of map tasks * bytes
per
> map
> by an order of magnitude. Are you sure that you've generated the data
> set
> correctly and that it's the only input path given to your job? Does
> bin/hadoop dfs -dus <path to dataset> come out as 1.6 TB?
>
> Matei
>
> On Sat, Mar 28, 2009 at 4:10 PM, Tu, Tiankai
> <Tiankai.Tu@deshawresearch.com>wrote:
>
> > Hi,
> >
> > I have been exploring the feasibility of using Hadoop/HDFS to
analyze
> > terabyte-scale scientific simulation output datasets. After a set of
> > initial experiments, I have a number of questions regarding (1) the
> > configuration setting and (2) the IO read performance.
> >
> >
>
------------------------------------------------------------------------
> >
>
------------------------------------------------------------------------
> > --------------------------------------------------------------
> > Unlike typical Hadoop applications, post-simulation analysis usually
> > processes one file at a time. So I wrote a
> > WholeFileInputFormat/WholeFileRecordReader that reads an entire file
> > without parsing the content, as suggested by the Hadoop wiki FAQ.
> >
> > Specifying WholeFileInputFormat as as input file format
> > (conf.setInputFormat(FrameInputFormat.class), I constructed a simple
> > MapReduce program with the sole purpose to measure how fast
> Hadoop/HDFS
> > can read data. Here is the gist of the test program:
> >
> > - The map method does nothing, it returns immediately when called
> > - No reduce task (conf.setNumReduceTasks(0)
> > - JVM reused (conf.setNumTasksToExecutePerJvm(-1))
> >
> > The detailed hardware/software configurations are listed below:
> >
> > Hardware:
> > - 103 nodes, each with two 2.33GHz quad-core processors and 16 GB
> memory
> > - 1 GigE connection out of each node and connecting to a 1GigE
switch
> in
> > the rack (3 racks in total)
> > - Each rack switch has 4 10-GigE connections to a backbone
> > full-bandwidth 10-GigE switch (second-tier switch)
> > - Software (md) RAID0 on 4 SATA disks, with a capacity of 500 GB per
> > node
> > - Raw RAID0 bulk data transfer rate around 200 MB/s  (dd a 4GB file
> > after dropping linux vfs cache)
> >
> > Software:
> > - 2.6.26-10smp kernel
> > - Hadoop 0.19.1
> > - Three nodes as namenode, secondary name node, and job tracker,
> > respectively
> > - Remaining 100 node as slaves, each running as both datanode and
> > tasktracker
> >
> > Relevant hadoop-site.xml setting:
> > - dfs.namenode.handler.count = 100
> > - io.file.buffer.size = 67108864
> > - io.bytes.per.checksum = 67108864
> > - mapred.task.timeout = 1200000
> > - mapred.map.max.attempts = 8
> > - mapred.tasktracker.map.tasks.maximum = 8
> > - dfs.replication = 3
> > - toploogy.script.file.name set properly to a correct Python script
> >
> > Dataset characteristics:
> >
> > - Four datasets consisting of files of 1 GB, 256 MB, 64 MB, and 2
MB,
> > respectively
> > - Each dataset has 1.6 terabyte data (that is, 1600 1GB files, 6400
> > 256MB files, etc.)
> > - Datasets populated into HDFS via a parallel C MPI program (linked
> with
> > libhdfs.so) running on the 100 slave nodes
> > - dfs block size set to be the file size (otherwise, accessing a
> single
> > file may require network data transfer)
> >
> > I launched the test MapReduce job one after another (so there was no
> > interference) and collected the following performance results:
> >
> > Dataset name, Finished in,  Failed/Killed task attempts, HDFS bytes
> read
> > (Map=Total), Rack-local map tasks, Launched map tasks, data-local
map
> > tasks
> >
> > 1GB file dataset, 16mins11sec, 0/382, (2,578,054,119,424), 98, 1982,
> > 1873
> > 256MB file dataset, 50min9sec,0/397, (7,754,295,017,472), 156, 6797,
> > 6639
> > 64MB file dataset,4hrs18mins21sec,394/251,(28,712,795,897,856), 153,
> > 26245, 26068
> >
> > The job for the 2MB file dataset failed to run due to the following
> > error:
> >
> > 09/03/27 21:39:58 INFO mapred.FileInputFormat: Total input paths to
> > process : 819200
> > java.lang.OutOfMemoryError: GC overhead limit exceeded
> >        at java.util.Arrays.copyOf(Arrays.java:2786)
> >        at
> > java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:71)
> >        at
> java.io.DataOutputStream.writeByte(DataOutputStream.java:136)
> >        at org.apache.hadoop.io.UTF8.writeChars(UTF8.java:274)
> >
> > After running into this error, the job tracker no longer accepted
> jobs.
> > I stopped and restarted the job tracker with a larger heap size
setup
> > (8GB). But it still didn't accept new jobs.
> >
> >
>
------------------------------------------------------------------------
> >
>
------------------------------------------------------------------------
> > --------------------------------------------------------------
> > Questions:
> >
> > (1) Why reading 1GB files is signfiicantly faster than reading
smaller
> > file sizes, even though reading a 256MB file is as much a bulk
> transfer?
> >
> > (2)  Why are the reported HDFS bytes read signfiicantly higher than
> the
> > dataset size (1.6TB)? (The percentage of failed/killed tasks was
much
> > lower than the extra bytes read.)
> >
> > (3) What is the maximum number (roughly) of input paths the job
> tracker
> > can handle? (For scientific simulation output dataset, it is quite
> > commonplace to have hundreds of thousands to millions of files.)
> >
> > (4) Even for the 1GB file dataset, considering the percentage of
> > data-local map tasks (94.5%), the overall end-to-end read bandwidth
> > (1.69GB/s) is much lower than the potential performance offered by
the
> > hardware (200MB/s local RAID0 read performance, multiplied by 100
> slave
> > nodes). What are the settings I should change (either in the test
> > MapReduce program or in the config files) to obtain better
> performance?
> >
> > Thank you very much.
> >
> > Tiankai
> >
> >
> >
> >
>

Mime
View raw message