Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E62B317D96 for ; Tue, 21 Oct 2014 22:00:14 +0000 (UTC) Received: (qmail 32917 invoked by uid 500); 21 Oct 2014 22:00:14 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 32889 invoked by uid 500); 21 Oct 2014 22:00:13 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 32880 invoked by uid 99); 21 Oct 2014 22:00:13 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Oct 2014 22:00:13 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id AA22E93E0BF; Tue, 21 Oct 2014 22:00:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bikas@apache.org To: commits@tez.apache.org Message-Id: <4f29dfa8341f4a8698f380e8574fa4b9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-1396. Grouping should generate consistent groups when given the same set of splits (bikas) Date: Tue, 21 Oct 2014 22:00:13 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master fdebd1994 -> 7a802b13d TEZ-1396. Grouping should generate consistent groups when given the same set of splits (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7a802b13 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7a802b13 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7a802b13 Branch: refs/heads/master Commit: 7a802b13d19c09ffe7bf0c331e0fac7234d0063b Parents: fdebd19 Author: Bikas Saha Authored: Tue Oct 21 14:59:56 2014 -0700 Committer: Bikas Saha Committed: Tue Oct 21 14:59:56 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../mapred/split/TezMapredSplitsGrouper.java | 13 +++- .../split/TezMapReduceSplitsGrouper.java | 25 +++++++- .../hadoop/mapred/split/TestGroupedSplits.java | 62 ++++++++++++++++++++ 4 files changed, 97 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/7a802b13/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 314338e..3b85ff8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -49,6 +49,8 @@ ALL CHANGES: TEZ-1668. InputInitializers should be able to register for Vertex state updates in the constructor. TEZ-1656. Grouping of splits should maintain the original ordering of splits within a group + TEZ-1396. Grouping should generate consistent groups when given the same set + of splits Release 0.5.1: 2014-10-02 http://git-wip-us.apache.org/repos/asf/tez/blob/7a802b13/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java index 022167e..af39948 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -80,6 +81,14 @@ public class TezMapredSplitsGrouper { } } + Map createLocationsMap(Configuration conf) { + if (conf.getBoolean(TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE, + TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE_DEFAULT)) { + return new TreeMap(); + } + return new HashMap(); + } + public InputSplit[] getGroupedSplits(Configuration conf, InputSplit[] originalSplits, int desiredNumSplits, String wrappedInputFormatName) throws IOException { @@ -171,7 +180,7 @@ public class TezMapredSplitsGrouper { List groupedSplitsList = new ArrayList(desiredNumSplits); long totalLength = 0; - Map distinctLocations = new HashMap(); + Map distinctLocations = createLocationsMap(conf); // go through splits and add them to locations for (InputSplit split : originalSplits) { totalLength += split.getLength(); @@ -347,7 +356,7 @@ public class TezMapredSplitsGrouper { // splits is expected to be much smaller RackResolver.init(conf); Map locToRackMap = new HashMap(distinctLocations.size()); - Map rackLocations = new HashMap(); + Map rackLocations = createLocationsMap(conf); for (String location : distinctLocations.keySet()) { String rack = emptyLocation; if (location != emptyLocation) { http://git-wip-us.apache.org/repos/asf/tez/blob/7a802b13/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java index 5fe9f59..1b919da 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java @@ -25,6 +25,9 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; + +import javax.annotation.Nullable; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -96,6 +99,14 @@ public class TezMapReduceSplitsGrouper { public static final String TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION = "tez.grouping.rack-split-reduction"; public static final float TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT = 0.75f; + + /** + * Repeated invocations of grouping on the same splits with the same parameters will produce the + * same groups. This may help in cache reuse but may cause hot-spotting on nodes when there are a + * large number of jobs reading the same hot data. True by default. + */ + public static final String TEZ_GROUPING_REPEATABLE = "tez.grouping.repeatable"; + public static final boolean TEZ_GROUPING_REPEATABLE_DEFAULT = true; class SplitHolder { InputSplit split; @@ -129,6 +140,14 @@ public class TezMapReduceSplitsGrouper { } } + Map createLocationsMap(Configuration conf) { + if (conf.getBoolean(TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE, + TezMapReduceSplitsGrouper.TEZ_GROUPING_REPEATABLE_DEFAULT)) { + return new TreeMap(); + } + return new HashMap(); + } + public List getGroupedSplits(Configuration conf, List originalSplits, int desiredNumSplits, String wrappedInputFormatName) throws IOException, InterruptedException { @@ -217,7 +236,7 @@ public class TezMapReduceSplitsGrouper { groupedSplits = new ArrayList(desiredNumSplits); long totalLength = 0; - Map distinctLocations = new HashMap(); + Map distinctLocations = createLocationsMap(conf); // go through splits and add them to locations for (InputSplit split : originalSplits) { totalLength += split.getLength(); @@ -393,7 +412,7 @@ public class TezMapReduceSplitsGrouper { // splits is expected to be much smaller RackResolver.init(conf); Map locToRackMap = new HashMap(distinctLocations.size()); - Map rackLocations = new HashMap(); + Map rackLocations = createLocationsMap(conf); for (String location : distinctLocations.keySet()) { String rack = emptyLocation; if (location != emptyLocation) { @@ -508,7 +527,7 @@ public class TezMapReduceSplitsGrouper { /** * This configuration will be modified in place */ - private TezMRSplitsGrouperConfigBuilder(Configuration conf) { + private TezMRSplitsGrouperConfigBuilder(@Nullable Configuration conf) { if (conf == null) { conf = new Configuration(false); } http://git-wip-us.apache.org/repos/asf/tez/blob/7a802b13/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java index 72ea035..1560e90 100644 --- a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java +++ b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java @@ -409,6 +409,68 @@ public class TestGroupedSplits { } } + @Test (timeout=5000) + public void testRepeatableSplits() throws IOException { + int numLocations = 3; + String[] locations = new String[numLocations]; + InputSplit[] origSplits = new InputSplit[numLocations*4]; + long splitLength = 100; + for (int i=0; i testSplits1 = gSplit1.getGroupedSplits(); + TezGroupedSplit gSplit2 = ((TezGroupedSplit) groupedSplits2[i]); + List testSplits2 = gSplit2.getGroupedSplits(); + Assert.assertEquals(testSplits1.size(), testSplits2.size()); + for (int j=0; j