tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-1656. Grouping of splits should maintain the original ordering of splits within a group (bikas) (cherry picked from commit fdebd19941c34f3d879a322e11cfe3642876b463)
Date Tue, 21 Oct 2014 22:02:14 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.5 92a586228 -> 03315be71


TEZ-1656. Grouping of splits should maintain the original ordering of splits within a group
(bikas)
(cherry picked from commit fdebd19941c34f3d879a322e11cfe3642876b463)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/03315be7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/03315be7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/03315be7

Branch: refs/heads/branch-0.5
Commit: 03315be7199ec9b56014d00a6ea15e1d70f56bb2
Parents: 92a5862
Author: Bikas Saha <bikas@apache.org>
Authored: Tue Oct 21 14:54:04 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Tue Oct 21 15:01:45 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../mapred/split/TezMapredSplitsGrouper.java    | 17 ++++-
 .../split/TezMapReduceSplitsGrouper.java        | 16 +++-
 .../hadoop/mapred/split/TestGroupedSplits.java  | 80 ++++++++++++++++++++
 4 files changed, 110 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/03315be7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index df6940d..24a5489 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -40,6 +40,8 @@ ALL CHANGES:
   TEZ-1686. TestRecoveryParser.testGetLastCompletedDAG fails sometimes
   TEZ-1667. Add a system test for InitializerEvents.
   TEZ-1668. InputInitializers should be able to register for Vertex state updates in the
constructor.
+  TEZ-1656. Grouping of splits should maintain the original ordering of splits
+  within a group
 
 Release 0.5.1: 2014-10-02
 

http://git-wip-us.apache.org/repos/asf/tez/blob/03315be7/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
index 92c6df5..022167e 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
@@ -359,8 +359,20 @@ public class TezMapredSplitsGrouper {
             rackLocations.put(rack, new LocationHolder(numRemainingSplits));
           }
         }
+        distinctLocations.clear();
         HashSet<String> rackSet = new HashSet<String>(rackLocations.size());
-        for (InputSplit split : remainingSplits) {
+        int numRackSplitsToGroup = remainingSplits.size();
+        for (InputSplit split : originalSplits) {
+          if (numRackSplitsToGroup == 0) {
+            break;
+          }
+          // Iterate through the original splits in their order and consider them for grouping.

+          // This maintains the original ordering in the list and thus subsequent grouping
will 
+          // maintain that order
+          if (!remainingSplits.contains(split)) {
+            continue;
+          }
+          numRackSplitsToGroup--;
           rackSet.clear();
           SplitHolder splitHolder = new SplitHolder(split);
           String[] locations = split.getLocations();
@@ -377,8 +389,7 @@ public class TezMapredSplitsGrouper {
             rackLocations.get(rack).splits.add(splitHolder);
           }
         }
-        
-        distinctLocations.clear();
+        remainingSplits.clear();
         distinctLocations = rackLocations;
         // adjust split length to be smaller because the data is non local
         float rackSplitReduction = conf.getFloat(

http://git-wip-us.apache.org/repos/asf/tez/blob/03315be7/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
index 2943d30..5fe9f59 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
@@ -405,8 +405,20 @@ public class TezMapReduceSplitsGrouper {
             rackLocations.put(rack, new LocationHolder(numRemainingSplits));
           }
         }
+        distinctLocations.clear();
         HashSet<String> rackSet = new HashSet<String>(rackLocations.size());
-        for (InputSplit split : remainingSplits) {
+        int numRackSplitsToGroup = remainingSplits.size();
+        for (InputSplit split : originalSplits) {
+          if (numRackSplitsToGroup == 0) {
+            break;
+          }
+          // Iterate through the original splits in their order and consider them for grouping.

+          // This maintains the original ordering in the list and thus subsequent grouping
will 
+          // maintain that order
+          if (!remainingSplits.contains(split)) {
+            continue;
+          }
+          numRackSplitsToGroup--;
           rackSet.clear();
           SplitHolder splitHolder = new SplitHolder(split);
           String[] locations = split.getLocations();
@@ -424,7 +436,7 @@ public class TezMapReduceSplitsGrouper {
           }
         }
         
-        distinctLocations.clear();
+        remainingSplits.clear();
         distinctLocations = rackLocations;
         // adjust split length to be smaller because the data is non local
         float rackSplitReduction = conf.getFloat(

http://git-wip-us.apache.org/repos/asf/tez/blob/03315be7/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
index 355d1ea..72ea035 100644
--- a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
+++ b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.mapred.split;
 
 import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -328,6 +330,84 @@ public class TestGroupedSplits {
     Assert.assertEquals(25, splits.length);
     
   }
+  
+  class TestInputSplit implements InputSplit {
+    long length;
+    String[] locations;
+    int position;
+    
+    public TestInputSplit(long length, String[] locations, int position) {
+      this.length = length;
+      this.locations = locations;
+      this.position = position;
+    }
+    
+    @Override
+    public void write(DataOutput out) throws IOException {
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+    }
+
+    @Override
+    public long getLength() throws IOException {
+      return length;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException {
+      return locations;
+    }
+    
+    public int getPosition() {
+      return position;
+    }
+  }
+  
+  @Test (timeout=5000)
+  public void testMaintainSplitOrdering() throws IOException {
+    int numLocations = 3;
+    String[] locations = new String[numLocations];
+    InputSplit[] origSplits = new InputSplit[numLocations*4];
+    long splitLength = 100;
+    for (int i=0; i<numLocations; i++) {
+      locations[i] = "node" + i;
+      String[] splitLoc = {locations[i]};
+      for (int j=0; j<4; j++) {
+        int pos = i*4 + j;
+        origSplits[pos] = new TestInputSplit(splitLength, splitLoc, pos);
+      }
+    }
+    
+    TezMapredSplitsGrouper grouper = new TezMapredSplitsGrouper();
+    JobConf conf = new JobConf(defaultConf);
+    conf = (JobConf) TezMapReduceSplitsGrouper.createConfigBuilder(conf)
+    .setGroupingSplitSize(splitLength*3, splitLength*3)
+    .setGroupingRackSplitSizeReduction(1)
+    .build();
+    
+    // based on the above settings the 3 nodes will each group 3 splits.
+    // the remainig 3 splits (1 from each node) will be grouped at rack level (default-rack)
+    // all of them will maintain ordering
+    InputSplit[] groupedSplits = grouper.getGroupedSplits(conf, origSplits, 4, "InputFormat");
+    Assert.assertEquals(4, groupedSplits.length);
+    for (int i=0; i<4; ++i) {
+      TezGroupedSplit split = (TezGroupedSplit)groupedSplits[i];
+      List<InputSplit> innerSplits = split.getGroupedSplits();
+      int pos = -1;
+      // splits in group maintain original order
+      for (InputSplit innerSplit : innerSplits) {
+        int splitPos = ((TestInputSplit) innerSplit).getPosition();
+        Assert.assertTrue(pos < splitPos);
+        pos = splitPos;
+      }
+      // last one is rack split
+      if (i==3) {
+        Assert.assertTrue(split.getRack() != null);
+      }
+    }
+  }
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
   @Test(timeout=10000)


Mime
View raw message