tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-1396. Grouping should generate consistent groups when given the same set of splits (bikas)
Date Tue, 21 Oct 2014 22:00:13 GMT
Repository: tez
Updated Branches:
  refs/heads/master fdebd1994 -> 7a802b13d


TEZ-1396. Grouping should generate consistent groups when given the same set of splits (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7a802b13
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7a802b13
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7a802b13

Branch: refs/heads/master
Commit: 7a802b13d19c09ffe7bf0c331e0fac7234d0063b
Parents: fdebd19
Author: Bikas Saha <bikas@apache.org>
Authored: Tue Oct 21 14:59:56 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Tue Oct 21 14:59:56 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../mapred/split/TezMapredSplitsGrouper.java    | 13 +++-
 .../split/TezMapReduceSplitsGrouper.java        | 25 +++++++-
 .../hadoop/mapred/split/TestGroupedSplits.java  | 62 ++++++++++++++++++++
 4 files changed, 97 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/7a802b13/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 314338e..3b85ff8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -49,6 +49,8 @@ ALL CHANGES:
   TEZ-1668. InputInitializers should be able to register for Vertex state updates in the
constructor.
   TEZ-1656. Grouping of splits should maintain the original ordering of splits
   within a group
+  TEZ-1396. Grouping should generate consistent groups when given the same set
+  of splits
 
 Release 0.5.1: 2014-10-02
 

http://git-wip-us.apache.org/repos/asf/tez/blob/7a802b13/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
index 022167e..af39948 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -80,6 +81,14 @@ public class TezMapredSplitsGrouper {
     }
   }
   
+  Map<String, LocationHolder> createLocationsMap(Configuration conf) {
+    if (conf.getBoolean(TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE, 
+        TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE_DEFAULT)) {
+      return new TreeMap<String, LocationHolder>();
+    }
+    return new HashMap<String, LocationHolder>();
+  }
+  
   public InputSplit[] getGroupedSplits(Configuration conf,
       InputSplit[] originalSplits, int desiredNumSplits,
       String wrappedInputFormatName) throws IOException {
@@ -171,7 +180,7 @@ public class TezMapredSplitsGrouper {
     List<InputSplit> groupedSplitsList = new ArrayList<InputSplit>(desiredNumSplits);
     
     long totalLength = 0;
-    Map<String, LocationHolder> distinctLocations = new HashMap<String, LocationHolder>();
+    Map<String, LocationHolder> distinctLocations = createLocationsMap(conf);
     // go through splits and add them to locations
     for (InputSplit split : originalSplits) {
       totalLength += split.getLength();
@@ -347,7 +356,7 @@ public class TezMapredSplitsGrouper {
         // splits is expected to be much smaller
         RackResolver.init(conf);
         Map<String, String> locToRackMap = new HashMap<String, String>(distinctLocations.size());
-        Map<String, LocationHolder> rackLocations = new HashMap<String, LocationHolder>();
+        Map<String, LocationHolder> rackLocations = createLocationsMap(conf);
         for (String location : distinctLocations.keySet()) {
           String rack = emptyLocation;
           if (location != emptyLocation) {

http://git-wip-us.apache.org/repos/asf/tez/blob/7a802b13/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
index 5fe9f59..1b919da 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
@@ -25,6 +25,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
+
+import javax.annotation.Nullable;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -96,6 +99,14 @@ public class TezMapReduceSplitsGrouper {
   public static final String TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION = 
                                               "tez.grouping.rack-split-reduction";
   public static final float TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT = 0.75f;
+  
+  /**
+   * Repeated invocations of grouping on the same splits with the same parameters will produce
the 
+   * same groups. This may help in cache reuse but may cause hot-spotting on nodes when there
are a
+   * large number of jobs reading the same hot data. True by default.
+   */
+  public static final String TEZ_GROUPING_REPEATABLE = "tez.grouping.repeatable";
+  public static final boolean TEZ_GROUPING_REPEATABLE_DEFAULT = true;
 
   class SplitHolder {
     InputSplit split;
@@ -129,6 +140,14 @@ public class TezMapReduceSplitsGrouper {
     }
   }
   
+  Map<String, LocationHolder> createLocationsMap(Configuration conf) {
+    if (conf.getBoolean(TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE, 
+        TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE_DEFAULT)) {
+      return new TreeMap<String, LocationHolder>();
+    }
+    return new HashMap<String, LocationHolder>();
+  }
+  
   public List<InputSplit> getGroupedSplits(Configuration conf,
       List<InputSplit> originalSplits, int desiredNumSplits,
       String wrappedInputFormatName) throws IOException, InterruptedException {
@@ -217,7 +236,7 @@ public class TezMapReduceSplitsGrouper {
     groupedSplits = new ArrayList<InputSplit>(desiredNumSplits);
     
     long totalLength = 0;
-    Map<String, LocationHolder> distinctLocations = new HashMap<String, LocationHolder>();
+    Map<String, LocationHolder> distinctLocations = createLocationsMap(conf);
     // go through splits and add them to locations
     for (InputSplit split : originalSplits) {
       totalLength += split.getLength();
@@ -393,7 +412,7 @@ public class TezMapReduceSplitsGrouper {
         // splits is expected to be much smaller
         RackResolver.init(conf);
         Map<String, String> locToRackMap = new HashMap<String, String>(distinctLocations.size());
-        Map<String, LocationHolder> rackLocations = new HashMap<String, LocationHolder>();
+        Map<String, LocationHolder> rackLocations = createLocationsMap(conf);
         for (String location : distinctLocations.keySet()) {
           String rack = emptyLocation;
           if (location != emptyLocation) {
@@ -508,7 +527,7 @@ public class TezMapReduceSplitsGrouper {
     /**
      * This configuration will be modified in place
      */
-    private TezMRSplitsGrouperConfigBuilder(Configuration conf) {
+    private TezMRSplitsGrouperConfigBuilder(@Nullable Configuration conf) {
       if (conf == null) {
         conf = new Configuration(false);
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/7a802b13/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
index 72ea035..1560e90 100644
--- a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
+++ b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
@@ -409,6 +409,68 @@ public class TestGroupedSplits {
     }
   }
 
+  @Test (timeout=5000)
+  public void testRepeatableSplits() throws IOException {
+    int numLocations = 3;
+    String[] locations = new String[numLocations];
+    InputSplit[] origSplits = new InputSplit[numLocations*4];
+    long splitLength = 100;
+    for (int i=0; i<numLocations; i++) {
+      locations[i] = "node" + i;
+    }
+    for (int i=0; i<4; i++) {
+      String[] splitLoc = null;
+      for (int j=0; j<3; j++) {
+        int pos = i*3 + j;
+        if (pos < 9) {
+          // for the first 9 splits do node grouping
+          // copy of the string to verify the comparator does not succeed by comparing the
same object
+          // provide 2 locations for each split to provide alternates for non-repeatability
+          String[] nodeLoc = {new String(locations[i]), new String(locations[(i+1)%numLocations])};
+          splitLoc = nodeLoc;
+        } else {
+          // for the last 3 splits do rack grouping by spreading them across the 3 nodes
+          String[] rackLoc = {new String(locations[j])};
+          splitLoc = rackLoc;
+        }
+        origSplits[pos] = new TestInputSplit(splitLength, splitLoc, pos);
+      }
+    }
+
+    TezMapredSplitsGrouper grouper = new TezMapredSplitsGrouper();
+    JobConf conf = new JobConf(defaultConf);
+    conf = (JobConf) TezMapReduceSplitsGrouper.createConfigBuilder(conf)
+    .setGroupingSplitSize(splitLength*3, splitLength*3)
+    .setGroupingRackSplitSizeReduction(1)
+    .build();
+    
+    // based on the above settings the 3 nodes will each group 3 splits.
+    // the remainig 3 splits (1 from each node) will be grouped at rack level (default-rack)
+    // all of them will maintain ordering
+    InputSplit[] groupedSplits1 = grouper.getGroupedSplits(conf, origSplits, 4, "InputFormat");
+    InputSplit[] groupedSplits2 = grouper.getGroupedSplits(conf, origSplits, 4, "InputFormat");
+    Assert.assertEquals(4, groupedSplits1.length);
+    Assert.assertEquals(4, groupedSplits2.length);
+    // check both split groups are the same. this depends on maintaining split order tested
above
+    for (int i=0; i<4; ++i) {
+      TezGroupedSplit gSplit1 = ((TezGroupedSplit) groupedSplits1[i]);
+      List<InputSplit> testSplits1 = gSplit1.getGroupedSplits();
+      TezGroupedSplit gSplit2 = ((TezGroupedSplit) groupedSplits2[i]);
+      List<InputSplit> testSplits2 = gSplit2.getGroupedSplits();
+      Assert.assertEquals(testSplits1.size(), testSplits2.size());
+      for (int j=0; j<testSplits1.size(); j++) {
+        TestInputSplit split1 = (TestInputSplit) testSplits1.get(j);
+        TestInputSplit split2 = (TestInputSplit) testSplits2.get(j);
+        Assert.assertEquals(split1.position, split2.position);
+      }
+      if (i==3) {
+        // check for rack split creation. Ensures repeatability holds for rack splits also
+        Assert.assertTrue(gSplit1.getRack() != null);
+        Assert.assertTrue(gSplit2.getRack() != null);
+      }
+    }
+  }
+  
   @SuppressWarnings({ "rawtypes", "unchecked" })
   @Test(timeout=10000)
   public void testGroupedSplitWithDuplicates() throws IOException {


Mime
View raw message