hadoop-hdfs-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Lilley <john.lil...@redpoint.net>
Subject RE: Assignment of data splits to mappers
Date Mon, 01 Jul 2013 23:07:34 GMT

Ah yes, I can see the wisdom of smaller tasks in (1).  Given that, does MR attempt to assign
multiple blocks per task when the #blocks >> #nodes?

Regarding (2) we can run a simple thought experiment.  It seems likely that every block will
have one dangling record, requiring a small read of the next block.  Let's consider the disk
seek overhead.  Assume 64MB blocks and 6ms disk seeks.  If our network transports 64MB/sec
on average, and a disk seek is 6ms, we can expect a task to take 1.006 seconds instead of
1.000 seconds.  The read of the extra block may induce a secondary seek under load by interrupting
an otherwise-sequential transfer on another task, but it still seems likely that the seek
overhead is < 1% under these assumptions.  It would be nice to know the TCP connection
setup/teardown overhead as well, assuming each datanode switch induces a connection.


From: Bertrand Dechoux [mailto:dechouxb@gmail.com]
Sent: Tuesday, June 18, 2013 3:54 PM
To: user@hadoop.apache.org
Subject: Re: Assignment of data splits to mappers

1) The tradeoff is between reducing the overhead of distributed computing and reducing the
cost of failure.
Less tasks, less overhead but the cost of failure will be bigger, mainly because the distribution
will be coarser. One of the reason was outlined before. A (failed) task is related to an input
split. Even when there is a single remaining task, the job tracker can not split it into several
smaller subtasks to reduce the overall latency. But if there is too much tasks, the startup
of the JVM itself can be a significant overhead.
2) It is assumed to not be significant. I would be interested to see the numbers but I don't
know any deep studies of the impact.
The biggest


On Fri, Jun 14, 2013 at 9:50 PM, John Lilley <john.lilley@redpoint.net<mailto:john.lilley@redpoint.net>>
Thanks for taking the time to explain this!
I understand your point about contiguous blocks; they just aren't likely to exist.  I am still
curious about two things:

1)      The map-per-block strategy.  If we have a lot more blocks than containers, wouldn't
there be some advantage to having fewer maps (which means fewer connections, less seeking
etc)?  Of course, increasing the block size would lead to the same thing and contiguous data
to boot, but one doesn't always know the total data size.

2)      The record-spanning-blocks issue.  I understand that under most file formats, records
*will* span blocks.  But if it were simple to prevent them from spanning blocks, would that
be of benefit?

From: Bertrand Dechoux [mailto:dechouxb@gmail.com<mailto:dechouxb@gmail.com>]
Sent: Thursday, June 13, 2013 3:37 PM
To: user@hadoop.apache.org<mailto:user@hadoop.apache.org>
Subject: Re: Assignment of data splits to mappers

The first question can be split (no pun intended) into two topics because there is actually
two distinct steps. First, the InputFormat partitions the data source into InputSplits. Its
implementation will determine the exact logic. Then the scheduler is responsible for ordering
where/when the InputSplit should be processed. But it doesn't really deal with block itself.
The InputSplit itself knows on which node the data would be local or not.
If there is no other choice, you (or more exactly the implementation) can choose to have several
blocks per InputSplit. But of course, it open lots of issues. The default strategy is one
block per InputSplit (and thus per map task because there is one map task per InputSplit).
If you really need to put several blocks per InputSplit, the root cause might often be that
the block size is not big enough. I think it is fair to assume that the 10000 block file your
are referring to is not using a 512MB block size.

MultiFileInputFormat does make InputSplit with blocks that are unlikely to be on the same
datanode. But that's a good decision in regard to the kind of data source it has to deal with.
Anyway, two 'continuous' blocks are also very unlikely to be on the same datanode (and even
less the same HDD, and even less really continuous). The only abstraction to tell whether
record of data should be close one from the other is the block. That's why the idea is not
really to optimize read of 'continuous' blocks on the same machine/HDD but to consider whether
the block size is the right one.

HDFS and Hadoop MapReduce have been designed to work together but there is a clean abstraction
between them. HDFS does not know about records and clients writing to HDFS (like MapReduce)
do not often need to know the block boundaries explicitly. That's why the RecordReader provided
by the InputSplit is responsible for interpreting the data into records. But of course, it
has to know how to deal with records stored on the block boundary. It will happen. The advantage
is that the record logic can not corrupt the storage and can be selected at read time. TextInputFormat,
KeyValueTextInputFormat and NLineInputFormat have different strategies which is only possible
due to this abstraction. And that's also why MapReduce can read/write to other kinds of 'datastorage',
like HBase for example : because it is not tightly coupled with HDFS. But it does also bring

On Thu, Jun 13, 2013 at 7:57 PM, John Lilley <john.lilley@redpoint.net<mailto:john.lilley@redpoint.net>>
When MR assigns data splits to map tasks, does it assign a set of non-contiguous blocks to
one map?  The reason I ask is, thinking through the problem, if I were the MR scheduler I
would attempt to hand a map task a bunch of blocks that all exist on the same datanode, and
then schedule the map task on that node.  E.g. if I have an HDFS file with 10000 blocks and
I want to create 1000 map tasks I'd like each map task to have 10 blocks, but those blocks
are unlikely to be contiguous on a given datanode.

This is related to a question I had asked earlier, which is whether any benefit could be had
by aligning data splits along block boundaries to avoid slopping reads of a block to the next
block and requiring another datanode connection.  The answer I got was that the extra connection
overhead wasn't important.  The reason I bring this up again is that comments in this discussion
(https://issues.apache.org/jira/browse/HADOOP-3315) imply that doing an extra seek to the
beginning of the file to read a magic number on open is a significant overhead, and this looks
like a similar issue to me.


Bertrand Dechoux

Bertrand Dechoux

View raw message