hadoop-mapreduce-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chandra Prakash Bhagtani (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (MAPREDUCE-5611) CombineFileInputFormat creates more rack-local tasks due to less split location info.
Date Thu, 07 Nov 2013 07:01:31 GMT

    [ https://issues.apache.org/jira/browse/MAPREDUCE-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13815716#comment-13815716
] 

Chandra Prakash Bhagtani commented on MAPREDUCE-5611:
-----------------------------------------------------

I am suggesting following small patch for this issue. Patched line is in *bold*

{code:title=CombineFileInputFormat.java|borderStyle=solid}
for (OneBlockInfo oneblock : blocksInNode) {
        if (blockToNodes.containsKey(oneblock)) {
          *nodes.addAll(Arrays.asList(blockToNodes.get(oneblock)));*
          validBlocks.add(oneblock);
          blockToNodes.remove(oneblock);
          curSplitSize += oneblock.length;

          // if the accumulated split size exceeds the maximum, then
          // create this split.
          if (maxSize != 0 && curSplitSize >= maxSize) {
            // create an input split and add it to the splits array
            addCreatedSplit(splits, nodes, validBlocks);
            curSplitSize = 0;
            validBlocks.clear();
          }
        }
      }
{code}

This will include all the nodes on which the replica lies in the location hash set, so that
whichever node JT schedules the tasks it finds all the replicas local to that node (in case
of replication = cluster size). This patch even helps in 3 replication factor, this fact is
evident with the following numbers in my cluster

9 replicas without patch  (all 6 tasks are rack-local)
9 replicas with patch  (5 tasks are data-local and 1 rack-local)

3 replicas without patch  (1 data-local and 5 rack-local)
3 replicas with patch  (2 data-local and 4 rack-local)

Please let me know, if I can submit the above patch.

> CombineFileInputFormat creates more rack-local tasks due to less split location info.
> -------------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-5611
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-5611
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>    Affects Versions: 2.2.0
>            Reporter: Chandra Prakash Bhagtani
>            Assignee: Chandra Prakash Bhagtani
>
> I have come across an issue with CombineFileInputFormat. Actually I ran a hive query
on approx 1.2 GB data with CombineHiveInputFormat which internally uses CombineFileInputFormat.
My cluster size is 9 datanodes and max.split.size is 256 MB
> When I ran this query with replication factor 9, hive consistently creates all 6 rack-local
tasks and with replication factor 3 it creates 5 rack-local and 1 data local tasks. 
>  When replication factor is 9 (equal to cluster size), all the tasks should be data-local
as each datanode contains all the replicas of the input data, but that is not happening i.e
all the tasks are rack-local. 
> When I dug into CombineFileInputFormat.java code in getMoreSplits method, I found the
issue with the following snippet (specially in case of higher replication factor)
> {code:title=CombineFileInputFormat.java|borderStyle=solid}
> for (Iterator<Map.Entry<String,
>          List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
>          iter.hasNext();) {
>        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
>       nodes.add(one.getKey());
>       List<OneBlockInfo> blocksInNode = one.getValue();
>       // for each block, copy it into validBlocks. Delete it from
>       // blockToNodes so that the same block does not appear in
>       // two different splits.
>       for (OneBlockInfo oneblock : blocksInNode) {
>         if (blockToNodes.containsKey(oneblock)) {
>           validBlocks.add(oneblock);
>           blockToNodes.remove(oneblock);
>           curSplitSize += oneblock.length;
>           // if the accumulated split size exceeds the maximum, then
>           // create this split.
>           if (maxSize != 0 && curSplitSize >= maxSize) {
>             // create an input split and add it to the splits array
>             addCreatedSplit(splits, nodes, validBlocks);
>             curSplitSize = 0;
>             validBlocks.clear();
>           }
>         }
>       }
> {code}
> First node in the map nodeToBlocks has all the replicas of input file, so the above code
creates 6 splits all with only one location. Now if JT doesn't schedule these tasks on that
node, all the tasks will be rack-local, even though all the other datanodes have all the other
replicas.



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Mime
View raw message