tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [13/35] tez git commit: TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly (gopalv)
Date Tue, 07 Apr 2015 20:12:31 GMT
TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly (gopalv)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/26518d5d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/26518d5d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/26518d5d

Branch: refs/heads/TEZ-2003
Commit: 26518d5dae1b04b3ea48b0518abd6b2312e279b9
Parents: 5e2a55f
Author: Gopal V <gopalv@apache.org>
Authored: Mon Apr 6 13:58:21 2015 -0700
Committer: Gopal V <gopalv@apache.org>
Committed: Mon Apr 6 13:58:21 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../hadoop/mapred/split/SplitSizeEstimator.java | 29 ++++++++
 .../split/TezGroupedSplitsInputFormat.java      | 13 +++-
 .../mapred/split/TezMapredSplitsGrouper.java    | 30 +++++++--
 .../mapreduce/split/SplitSizeEstimator.java     | 27 ++++++++
 .../split/TezGroupedSplitsInputFormat.java      | 13 +++-
 .../split/TezMapReduceSplitsGrouper.java        | 33 ++++++++--
 .../hadoop/mapred/split/TestGroupedSplits.java  | 69 ++++++++++++++++++++
 8 files changed, 202 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/26518d5d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8fad569..ee0ef70 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@ Release 0.7.0: Unreleased
 
 INCOMPATIBLE CHANGES
   TEZ-2176. Move all logging to slf4j. (commons-logging jar no longer part of Tez tar)
+  TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
+
 
 ALL CHANGES:
   TEZ-2232. Allow setParallelism to be called multiple times before tasks get

http://git-wip-us.apache.org/repos/asf/tez/blob/26518d5d/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/SplitSizeEstimator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/SplitSizeEstimator.java
b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/SplitSizeEstimator.java
new file mode 100644
index 0000000..a4c0e73
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/SplitSizeEstimator.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.split;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+/* the two big differences between the mapred.* and mapreduce.* split classes 
+ * is that mapreduce throws InterruptedExceptions */
+public interface SplitSizeEstimator {
+  long getEstimatedSize(InputSplit split) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/26518d5d/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
index ddfb856..707f9ad 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.split.SplitSizeEstimator;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
 
@@ -50,6 +51,8 @@ public class TezGroupedSplitsInputFormat<K, V>
   InputFormat<K, V> wrappedInputFormat;
   int desiredNumSplits = 0;
   Configuration conf;
+
+  SplitSizeEstimator estimator;
   
   public TezGroupedSplitsInputFormat() {
     
@@ -61,6 +64,14 @@ public class TezGroupedSplitsInputFormat<K, V>
       LOG.debug("wrappedInputFormat: " + wrappedInputFormat.getClass().getName());
     }
   }
+
+  public void setSplitSizeEstimator(SplitSizeEstimator estimator) {
+    Preconditions.checkArgument(estimator != null);
+    this.estimator = estimator;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Split size estimator : " + estimator);
+    }
+  }
   
   public void setDesiredNumberOfSplits(int num) {
     Preconditions.checkArgument(num >= 0);
@@ -75,7 +86,7 @@ public class TezGroupedSplitsInputFormat<K, V>
     InputSplit[] originalSplits = wrappedInputFormat.getSplits(job, numSplits);
     TezMapredSplitsGrouper grouper = new TezMapredSplitsGrouper();
     String wrappedInputFormatName = wrappedInputFormat.getClass().getName();
-    return grouper.getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName);
+    return grouper.getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName,
estimator);
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/26518d5d/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
index 4ef50fd..29b5e1e 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.split.SplitSizeEstimator;
 import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
 import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -81,6 +82,15 @@ public class TezMapredSplitsGrouper {
     }
   }
   
+  private static final SplitSizeEstimator DEFAULT_SPLIT_ESTIMATOR = new DefaultSplitSizeEstimator();
+
+  static final class DefaultSplitSizeEstimator implements SplitSizeEstimator {
+    @Override
+    public long getEstimatedSize(InputSplit split) throws IOException {
+      return split.getLength();
+    }
+  }
+
   Map<String, LocationHolder> createLocationsMap(Configuration conf) {
     if (conf.getBoolean(TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE, 
         TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE_DEFAULT)) {
@@ -92,6 +102,12 @@ public class TezMapredSplitsGrouper {
   public InputSplit[] getGroupedSplits(Configuration conf,
       InputSplit[] originalSplits, int desiredNumSplits,
       String wrappedInputFormatName) throws IOException {
+    return getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName,
null);
+  }
+
+  public InputSplit[] getGroupedSplits(Configuration conf,
+      InputSplit[] originalSplits, int desiredNumSplits,
+      String wrappedInputFormatName, SplitSizeEstimator estimator) throws IOException {
     LOG.info("Grouping splits in Tez");
 
     int configNumSplits = conf.getInt(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_COUNT,
0);
@@ -100,7 +116,11 @@ public class TezMapredSplitsGrouper {
       desiredNumSplits = configNumSplits;
       LOG.info("Desired numSplits overridden by config to: " + desiredNumSplits);
     }
-    
+
+    if (estimator == null) {
+      estimator = DEFAULT_SPLIT_ESTIMATOR;
+    }
+
     if (! (configNumSplits > 0 || 
           originalSplits == null || 
           originalSplits.length == 0) ) {
@@ -110,7 +130,7 @@ public class TezMapredSplitsGrouper {
       // Do sanity checks
       long totalLength = 0;
       for (InputSplit split : originalSplits) {
-        totalLength += split.getLength();
+        totalLength += estimator.getEstimatedSize(split);
       }
 
       int splitCount = desiredNumSplits>0?desiredNumSplits:originalSplits.length;
@@ -183,7 +203,7 @@ public class TezMapredSplitsGrouper {
     Map<String, LocationHolder> distinctLocations = createLocationsMap(conf);
     // go through splits and add them to locations
     for (InputSplit split : originalSplits) {
-      totalLength += split.getLength();
+      totalLength += estimator.getEstimatedSize(split);
       String[] locations = split.getLocations();
       if (locations == null || locations.length == 0) {
         locations = emptyLocations;
@@ -272,13 +292,13 @@ public class TezMapredSplitsGrouper {
         int groupNumSplits = 0;
         do {
           group.add(splitHolder);
-          groupLength += splitHolder.split.getLength();
+          groupLength += estimator.getEstimatedSize(splitHolder.split);
           groupNumSplits++;
           holder.incrementHeadIndex();
           splitHolder = holder.getUnprocessedHeadSplit();
         } while(splitHolder != null  
             && (!groupByLength || 
-                (groupLength + splitHolder.split.getLength() <= lengthPerGroup))
+                (groupLength + estimator.getEstimatedSize(splitHolder.split) <= lengthPerGroup))
             && (!groupByCount || 
                 (groupNumSplits + 1 <= numSplitsInGroup)));
 

http://git-wip-us.apache.org/repos/asf/tez/blob/26518d5d/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitSizeEstimator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitSizeEstimator.java
b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitSizeEstimator.java
new file mode 100644
index 0000000..2f8d74f
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitSizeEstimator.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+public interface SplitSizeEstimator {
+  long getEstimatedSize(InputSplit split) throws InterruptedException, IOException;
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/26518d5d/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
index f5999b3..519b52a 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
@@ -52,6 +52,7 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K,
V>
   InputFormat<K, V> wrappedInputFormat;
   int desiredNumSplits = 0;
   Configuration conf;
+  SplitSizeEstimator estimator;
   
   public TezGroupedSplitsInputFormat() {
     
@@ -71,7 +72,15 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K,
V>
       LOG.debug("desiredNumSplits: " + desiredNumSplits);
     }
   }
-  
+
+  public void setSplitSizeEstimator(SplitSizeEstimator estimator) {
+    Preconditions.checkArgument(estimator != null);
+    this.estimator = estimator;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Split size estimator : " + estimator);
+    }
+  }
+
   class SplitHolder {
     InputSplit split;
     boolean isProcessed = false;
@@ -110,7 +119,7 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K,
V>
     List<InputSplit> originalSplits = wrappedInputFormat.getSplits(context);
     TezMapReduceSplitsGrouper grouper = new TezMapReduceSplitsGrouper();
     String wrappedInputFormatName = wrappedInputFormat.getClass().getName();
-    return grouper.getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName);
+    return grouper.getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName,
estimator);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/26518d5d/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
index 6caeba4..88b9845 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java
@@ -139,7 +139,17 @@ public class TezMapReduceSplitsGrouper {
       headIndex++;
     }
   }
-  
+
+  private static final SplitSizeEstimator DEFAULT_SPLIT_ESTIMATOR = new DefaultSplitSizeEstimator();
+
+  static final class DefaultSplitSizeEstimator implements SplitSizeEstimator {
+    @Override
+    public long getEstimatedSize(InputSplit split) throws InterruptedException,
+        IOException {
+      return split.getLength();
+    }
+  }
+
   Map<String, LocationHolder> createLocationsMap(Configuration conf) {
     if (conf.getBoolean(TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE, 
         TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE_DEFAULT)) {
@@ -151,6 +161,13 @@ public class TezMapReduceSplitsGrouper {
   public List<InputSplit> getGroupedSplits(Configuration conf,
       List<InputSplit> originalSplits, int desiredNumSplits,
       String wrappedInputFormatName) throws IOException, InterruptedException {
+    return getGroupedSplits(conf, originalSplits, desiredNumSplits,
+        wrappedInputFormatName, null);
+  }
+
+  public List<InputSplit> getGroupedSplits(Configuration conf,
+      List<InputSplit> originalSplits, int desiredNumSplits,
+      String wrappedInputFormatName, SplitSizeEstimator estimator) throws IOException, InterruptedException
{
     LOG.info("Grouping splits in Tez");
 
     int configNumSplits = conf.getInt(TEZ_GROUPING_SPLIT_COUNT, 0);
@@ -159,7 +176,11 @@ public class TezMapReduceSplitsGrouper {
       desiredNumSplits = configNumSplits;
       LOG.info("Desired numSplits overridden by config to: " + desiredNumSplits);
     }
-    
+
+    if (estimator == null) {
+      estimator = DEFAULT_SPLIT_ESTIMATOR;
+    }
+
     if (! (configNumSplits > 0 || 
           originalSplits == null || 
           originalSplits.size() == 0)) {
@@ -170,7 +191,7 @@ public class TezMapReduceSplitsGrouper {
       // Do sanity checks
       long totalLength = 0;
       for (InputSplit split : originalSplits) {
-        totalLength += split.getLength();
+        totalLength += estimator.getEstimatedSize(split);
       }
   
       int splitCount = desiredNumSplits>0?desiredNumSplits:originalSplits.size();
@@ -239,7 +260,7 @@ public class TezMapReduceSplitsGrouper {
     Map<String, LocationHolder> distinctLocations = createLocationsMap(conf);
     // go through splits and add them to locations
     for (InputSplit split : originalSplits) {
-      totalLength += split.getLength();
+      totalLength += estimator.getEstimatedSize(split);
       String[] locations = split.getLocations();
       if (locations == null || locations.length == 0) {
         locations = emptyLocations;
@@ -328,13 +349,13 @@ public class TezMapReduceSplitsGrouper {
         int groupNumSplits = 0;
         do {
           group.add(splitHolder);
-          groupLength += splitHolder.split.getLength();
+          groupLength += estimator.getEstimatedSize(splitHolder.split);
           groupNumSplits++;
           holder.incrementHeadIndex();
           splitHolder = holder.getUnprocessedHeadSplit();
         } while(splitHolder != null  
             && (!groupByLength || 
-                (groupLength + splitHolder.split.getLength() <= lengthPerGroup))
+                (groupLength + estimator.getEstimatedSize(splitHolder.split) <= lengthPerGroup))
             && (!groupByCount || 
                 (groupNumSplits + 1 <= numSplitsInGroup)));
 

http://git-wip-us.apache.org/repos/asf/tez/blob/26518d5d/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
index 689ea2c..13b69c8 100644
--- a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
+++ b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.split.SplitSizeEstimator;
 import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.MockDNSToSwitchMapping;
@@ -547,4 +548,72 @@ public class TestGroupedSplits {
     split.write(new DataOutputStream(bOut));
   }
 
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test(timeout=10000)
+  public void testGroupedSplitWithEstimator() throws IOException {
+    JobConf job = new JobConf(defaultConf);
+
+    job = (JobConf) TezMapReduceSplitsGrouper.createConfigBuilder(job)
+        .setGroupingSplitSize(12*1000*1000l, 25*1000*1000l)
+        .build();
+
+    InputFormat mockWrappedFormat = mock(InputFormat.class);
+    TezGroupedSplitsInputFormat<LongWritable , Text> format = 
+        new TezGroupedSplitsInputFormat<LongWritable, Text>();
+    format.setConf(job);
+    format.setInputFormat(mockWrappedFormat);
+
+    final InputSplit mockSplit1 = mock(InputSplit.class);
+    final InputSplit mockSplit2 = mock(InputSplit.class);
+    final InputSplit mockSplit3 = mock(InputSplit.class);
+
+    final String[] locations = new String[] { "common", "common", "common" };
+
+    final SplitSizeEstimator estimator = new SplitSizeEstimator() {
+
+      @Override
+      public long getEstimatedSize(InputSplit split) throws IOException {
+        LOG.info("Estimating 10x of " + split.getLength());
+        // 10x compression
+        return 10 * split.getLength();
+      }
+    };
+
+    when(mockSplit1.getLength()).thenReturn(1000 * 1000l);
+    when(mockSplit1.getLocations()).thenReturn(locations);
+
+    when(mockSplit2.getLength()).thenReturn(1000 * 1000l);
+    when(mockSplit2.getLocations()).thenReturn(locations);
+
+    when(mockSplit3.getLength()).thenReturn(2 * 1000 * 1000l + 1);
+    when(mockSplit3.getLocations()).thenReturn(locations);
+
+    // put multiple splits which should be grouped (1,1,2) Mb, but estimated to be 10x
+    // 10,10,20Mb - grouped with min=12Mb, max=25Mb
+    // should be grouped as (1,1),(2)
+    InputSplit[] mockSplits = new InputSplit[] { mockSplit1, mockSplit2,
+        mockSplit3 };
+
+    when(mockWrappedFormat.getSplits((JobConf) anyObject(), anyInt()))
+        .thenReturn(mockSplits);
+
+    format.setDesiredNumberOfSplits(1);
+    format.setSplitSizeEstimator(estimator);
+
+    InputSplit[] splits = format.getSplits(job, 1);
+    // due to the min = 12Mb
+    Assert.assertEquals(2, splits.length);
+
+    for (InputSplit group : splits) {
+      TezGroupedSplit split = (TezGroupedSplit) group;
+      if (split.wrappedSplits.size() == 2) {
+        // split1+split2
+        Assert.assertEquals(split.getLength(), 2 * 1000 * 1000l);
+      } else {
+        // split3
+        Assert.assertEquals(split.getLength(), 2 * 1000 * 1000l + 1);
+      }
+    }
+  }
+
 }


Mime
View raw message