Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AC312200BAF for ; Mon, 17 Oct 2016 01:24:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AABA0160AF8; Sun, 16 Oct 2016 23:24:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A1AEA160AD0 for ; Mon, 17 Oct 2016 01:24:06 +0200 (CEST) Received: (qmail 61786 invoked by uid 500); 16 Oct 2016 23:24:05 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 61777 invoked by uid 99); 16 Oct 2016 23:24:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 16 Oct 2016 23:24:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 17D9BDFE13; Sun, 16 Oct 2016 23:24:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mingma@apache.org To: commits@tez.apache.org Message-Id: <787caa0e216e4736905e768682067962@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-3430. Make split sorting optional. (mingma) Date: Sun, 16 Oct 2016 23:24:05 +0000 (UTC) archived-at: Sun, 16 Oct 2016 23:24:07 -0000 Repository: tez Updated Branches: refs/heads/master 43f7b5e3a -> 48208dc8c TEZ-3430. Make split sorting optional. (mingma) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/48208dc8 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/48208dc8 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/48208dc8 Branch: refs/heads/master Commit: 48208dc8cb835fa6bb2de9e82aecf85cae83e35b Parents: 43f7b5e Author: Ming Ma Authored: Sun Oct 16 16:23:52 2016 -0700 Committer: Ming Ma Committed: Sun Oct 16 16:23:52 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/mapreduce/client/YARNRunner.java | 7 +- .../common/MRInputAMSplitGenerator.java | 13 ++- .../tez/mapreduce/hadoop/MRInputHelpers.java | 64 +++++++-------- .../org/apache/tez/mapreduce/input/MRInput.java | 43 +++++----- .../src/main/proto/MRRuntimeProtos.proto | 1 + .../org/apache/tez/mapreduce/TezTestUtils.java | 81 +++++++++++++++++++ .../common/TestMRInputSplitDistributor.java | 84 +------------------- 8 files changed, 152 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/48208dc8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ef6b890..654e88a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3430. Make split sorting optional. TEZ-3466. Tez classpath building to mimic mapreduce classpath building. TEZ-3453. Correct the downloaded ATS dag data location for analyzer. TEZ-3449. Fix Spelling typos. http://git-wip-us.apache.org/repos/asf/tez/blob/48208dc8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java index 820e2e4..00a68cd 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java @@ -818,9 +818,10 @@ public class YARNRunner implements ClientProtocol { private static class MRInputHelpersInternal extends MRInputHelpers { protected static UserPayload createMRInputPayload(Configuration conf, - MRRuntimeProtos.MRSplitsProto mrSplitsProto) throws - IOException { - return MRInputHelpers.createMRInputPayload(conf, mrSplitsProto); + MRRuntimeProtos.MRSplitsProto mrSplitsProto) throws + IOException { + return MRInputHelpers.createMRInputPayload(conf, mrSplitsProto, false, + true); } } http://git-wip-us.apache.org/repos/asf/tez/blob/48208dc8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java index c109664..dbfdcb3 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java @@ -107,9 +107,11 @@ public class MRInputAMSplitGenerator extends InputInitializer { boolean groupSplits = userPayloadProto.getGroupingEnabled(); + boolean sortSplits = userPayloadProto.getSortSplitsEnabled(); LOG.info("Input " + getContext().getInputName() + " asking for " + numTasks - + " tasks. Headroom: " + totalResource + " Task Resource: " - + taskResource + " waves: " + waves + ", groupingEnabled: " + groupSplits); + + " tasks. Headroom: " + totalResource + ". Task Resource: " + + taskResource + ". waves: " + waves + ". groupingEnabled: " + + groupSplits + ". SortSplitsEnabled: " + sortSplits); // Read all credentials into the credentials instance stored in JobConf. JobConf jobConf = new JobConf(conf); @@ -117,11 +119,8 @@ public class MRInputAMSplitGenerator extends InputInitializer { InputSplitInfoMem inputSplitInfo = null; - if (groupSplits) { - inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, true, numTasks); - } else { - inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, false, 0); - } + inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, + groupSplits, sortSplits, groupSplits ? numTasks : 0); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Time to create splits to mem: " + sw.now(TimeUnit.MILLISECONDS)); http://git-wip-us.apache.org/repos/asf/tez/blob/48208dc8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java index 9b88c4d..97e1677 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java @@ -111,7 +111,7 @@ public class MRInputHelpers { InputDescriptor inputDescriptor = InputDescriptor.create(useLegacyInput ? MRInputLegacy.class .getName() : MRInput.class.getName()) - .setUserPayload(createMRInputPayload(conf, null)); + .setUserPayload(createMRInputPayload(conf, null, false, true)); Map additionalLocalResources = new HashMap(); updateLocalResourcesForInputSplits(conf, inputSplitInfo, additionalLocalResources); @@ -278,8 +278,8 @@ public class MRInputHelpers { * @throws InterruptedException */ @InterfaceStability.Unstable - public static InputSplitInfoMem generateInputSplitsToMem(Configuration conf, boolean groupSplits, - int targetTasks) + public static InputSplitInfoMem generateInputSplitsToMem(Configuration conf, + boolean groupSplits, boolean sortSplits, int targetTasks) throws IOException, ClassNotFoundException, InterruptedException { InputSplitInfoMem splitInfoMem = null; @@ -290,7 +290,7 @@ public class MRInputHelpers { } Job job = Job.getInstance(conf); org.apache.hadoop.mapreduce.InputSplit[] splits = - generateNewSplits(job, groupSplits, targetTasks); + generateNewSplits(job, groupSplits, sortSplits, targetTasks); splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits), splits.length, job.getCredentials(), job.getConfiguration()); } else { @@ -298,7 +298,7 @@ public class MRInputHelpers { LOG.debug("Generating mapred api input splits"); } org.apache.hadoop.mapred.InputSplit[] splits = - generateOldSplits(jobConf, groupSplits, targetTasks); + generateOldSplits(jobConf, groupSplits, sortSplits, targetTasks); splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits), splits.length, jobConf.getCredentials(), jobConf); } @@ -379,8 +379,8 @@ public class MRInputHelpers { @SuppressWarnings({ "rawtypes", "unchecked" }) private static org.apache.hadoop.mapreduce.InputSplit[] generateNewSplits( - JobContext jobContext, boolean groupSplits, int numTasks) - throws ClassNotFoundException, IOException, + JobContext jobContext, boolean groupSplits, boolean sortSplits, + int numTasks) throws ClassNotFoundException, IOException, InterruptedException { Configuration conf = jobContext.getConfiguration(); @@ -413,15 +413,18 @@ public class MRInputHelpers { org.apache.hadoop.mapreduce.InputSplit[] splits = (org.apache.hadoop.mapreduce.InputSplit[]) array .toArray(new org.apache.hadoop.mapreduce.InputSplit[array.size()]); - // sort the splits into order based on size, so that the biggest - // go first - Arrays.sort(splits, new InputSplitComparator()); + if (sortSplits) { + // sort the splits into order based on size, so that the biggest + // go first + Arrays.sort(splits, new InputSplitComparator()); + } return splits; } @SuppressWarnings({ "rawtypes", "unchecked" }) private static org.apache.hadoop.mapred.InputSplit[] generateOldSplits( - JobConf jobConf, boolean groupSplits, int numTasks) throws IOException { + JobConf jobConf, boolean groupSplits, boolean sortSplits, int numTasks) + throws IOException { // This is the real InputFormat org.apache.hadoop.mapred.InputFormat inputFormat; @@ -445,9 +448,11 @@ public class MRInputHelpers { } org.apache.hadoop.mapred.InputSplit[] splits = finalInputFormat .getSplits(jobConf, jobConf.getNumMapTasks()); - // sort the splits into order based on size, so that the biggest - // go first - Arrays.sort(splits, new OldInputSplitComparator()); + if (sortSplits) { + // sort the splits into order based on size, so that the biggest + // go first + Arrays.sort(splits, new OldInputSplitComparator()); + } return splits; } @@ -519,7 +524,7 @@ public class MRInputHelpers { ClassNotFoundException { org.apache.hadoop.mapreduce.InputSplit[] splits = - generateNewSplits(jobContext, false, 0); + generateNewSplits(jobContext, false, true, 0); Configuration conf = jobContext.getConfiguration(); @@ -556,7 +561,7 @@ public class MRInputHelpers { Path inputSplitDir) throws IOException { org.apache.hadoop.mapred.InputSplit[] splits = - generateOldSplits(jobConf, false, 0); + generateOldSplits(jobConf, false, true, 0); JobSplitWriter.createSplitFiles(inputSplitDir, jobConf, inputSplitDir.getFileSystem(jobConf), splits); @@ -664,8 +669,8 @@ public class MRInputHelpers { } /** - * Called to specify that grouping of input splits be performed by Tez - * The conf should have the input format class configuration + * When isGrouped is true, it specifies that grouping of input splits be + * performed by Tez The conf should have the input format class configuration * set to the TezGroupedSplitsInputFormat. The real input format class name * should be passed as an argument to this method. *

@@ -674,27 +679,20 @@ public class MRInputHelpers { * or {@link org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat} */ @InterfaceAudience.Private - protected static UserPayload createMRInputPayloadWithGrouping(Configuration conf) throws IOException { - Preconditions - .checkArgument(conf != null, "Configuration must be specified"); - return createMRInputPayload(TezUtils.createByteStringFromConf(conf), - null, true); - } - - @InterfaceAudience.Private protected static UserPayload createMRInputPayload(Configuration conf, - MRRuntimeProtos.MRSplitsProto mrSplitsProto) throws - IOException { + MRRuntimeProtos.MRSplitsProto mrSplitsProto, boolean isGrouped, + boolean isSorted) throws + IOException { Preconditions .checkArgument(conf != null, "Configuration must be specified"); return createMRInputPayload(TezUtils.createByteStringFromConf(conf), - mrSplitsProto, false); + mrSplitsProto, isGrouped, isSorted); } private static UserPayload createMRInputPayload(ByteString bytes, - MRRuntimeProtos.MRSplitsProto mrSplitsProto, - boolean isGrouped) throws IOException { + MRRuntimeProtos.MRSplitsProto mrSplitsProto, + boolean isGrouped, boolean isSorted) throws IOException { MRRuntimeProtos.MRInputUserPayloadProto.Builder userPayloadBuilder = MRRuntimeProtos.MRInputUserPayloadProto .newBuilder(); @@ -703,7 +701,9 @@ public class MRInputHelpers { userPayloadBuilder.setSplits(mrSplitsProto); } userPayloadBuilder.setGroupingEnabled(isGrouped); - return UserPayload.create(userPayloadBuilder.build().toByteString().asReadOnlyByteBuffer()); + userPayloadBuilder.setSortSplitsEnabled(isSorted); + return UserPayload.create(userPayloadBuilder.build(). + toByteString().asReadOnlyByteBuffer()); } http://git-wip-us.apache.org/repos/asf/tez/blob/48208dc8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index 1b0ffed..248a92a 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -113,6 +113,7 @@ public class MRInput extends MRInputBase { final boolean inputFormatProvided; boolean useNewApi; boolean groupSplitsInAM = true; + boolean sortSplitsInAM = true; boolean generateSplitsInAM = true; String inputClassName = MRInput.class.getName(); boolean getCredentialsForSourceFilesystem = true; @@ -191,7 +192,17 @@ public class MRInput extends MRInputBase { groupSplitsInAM = value; return this; } - + + /** + * Set whether splits should be sorted (default true) + * @param value whether to sort splits in the AM or not + * @return {@link org.apache.tez.mapreduce.input.MRInput.MRInputConfigBuilder} + */ + public MRInputConfigBuilder sortSplits(boolean value) { + sortSplitsInAM = value; + return this; + } + /** * Set whether splits should be generated in the Tez App Master (default true) * @param value whether to generate splits in the AM or not @@ -266,7 +277,7 @@ public class MRInput extends MRInputBase { InputSplitInfo inputSplitInfo; setupBasicConf(conf); try { - inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(conf, false, 0); + inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(conf, false, true, 0); } catch (Exception e) { throw new TezUncheckedException(e); } @@ -298,12 +309,8 @@ public class MRInput extends MRInputBase { Collection uris = maybeGetURIsForCredentials(); - UserPayload payload = null; - if (groupSplitsInAM) { - payload = MRInputHelpersInternal.createMRInputPayloadWithGrouping(conf); - } else { - payload = MRInputHelpersInternal.createMRInputPayload(conf, null); - } + UserPayload payload = MRInputHelpersInternal.createMRInputPayload( + conf, groupSplitsInAM, sortSplitsInAM); DataSourceDescriptor ds = DataSourceDescriptor .create(InputDescriptor.create(inputClassName).setUserPayload(payload), @@ -326,12 +333,8 @@ public class MRInput extends MRInputBase { Collection uris = maybeGetURIsForCredentials(); - UserPayload payload = null; - if (groupSplitsInAM) { - payload = MRInputHelpersInternal.createMRInputPayloadWithGrouping(conf); - } else { - payload = MRInputHelpersInternal.createMRInputPayload(conf, null); - } + UserPayload payload = MRInputHelpersInternal.createMRInputPayload( + conf, groupSplitsInAM, sortSplitsInAM); DataSourceDescriptor ds = DataSourceDescriptor.create( InputDescriptor.create(inputClassName).setUserPayload(payload), @@ -703,15 +706,17 @@ public class MRInput extends MRInputBase { private static class MRInputHelpersInternal extends MRInputHelpers { - protected static UserPayload createMRInputPayloadWithGrouping(Configuration conf) throws - IOException { - return MRInputHelpers.createMRInputPayloadWithGrouping(conf); + protected static UserPayload createMRInputPayload(Configuration conf, + boolean isGrouped, boolean isSorted) throws IOException { + return MRInputHelpers.createMRInputPayload(conf, null, isGrouped, + isSorted); } protected static UserPayload createMRInputPayload(Configuration conf, - MRRuntimeProtos.MRSplitsProto mrSplitsProto) throws + MRRuntimeProtos.MRSplitsProto mrSplitsProto) throws IOException { - return MRInputHelpers.createMRInputPayload(conf, mrSplitsProto); + return MRInputHelpers.createMRInputPayload(conf, mrSplitsProto, false, + true); } } http://git-wip-us.apache.org/repos/asf/tez/blob/48208dc8/tez-mapreduce/src/main/proto/MRRuntimeProtos.proto ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/proto/MRRuntimeProtos.proto b/tez-mapreduce/src/main/proto/MRRuntimeProtos.proto index 646eb3a..8cba5fe 100644 --- a/tez-mapreduce/src/main/proto/MRRuntimeProtos.proto +++ b/tez-mapreduce/src/main/proto/MRRuntimeProtos.proto @@ -38,4 +38,5 @@ message MRInputUserPayloadProto { optional bytes configuration_bytes = 1; optional MRSplitsProto splits = 2; optional bool grouping_enabled = 3; + optional bool sort_splits_enabled = 4 [default = true]; } http://git-wip-us.apache.org/repos/asf/tez/blob/48208dc8/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java index 8a8b141..8912ad2 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java @@ -18,10 +18,17 @@ package org.apache.tez.mapreduce; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.InputInitializerContext; + +import java.io.IOException; +import java.util.Set; public class TezTestUtils { @@ -45,4 +52,78 @@ public class TezTestUtils { jobId), vertexId), taskId); } + + public static class TezRootInputInitializerContextForTest implements + InputInitializerContext { + + private final ApplicationId appId; + private final UserPayload payload; + + public TezRootInputInitializerContextForTest(UserPayload payload) throws IOException { + appId = ApplicationId.newInstance(1000, 200); + this.payload = payload == null ? UserPayload.create(null) : payload; + } + + @Override + public ApplicationId getApplicationId() { + return appId; + } + + @Override + public String getDAGName() { + return "FakeDAG"; + } + + @Override + public String getInputName() { + return "MRInput"; + } + + @Override + public UserPayload getInputUserPayload() { + return payload; + } + + @Override + public int getNumTasks() { + return 100; + } + + @Override + public Resource getVertexTaskResource() { + return Resource.newInstance(1024, 1); + } + + @Override + public Resource getTotalAvailableResource() { + return Resource.newInstance(10240, 10); + } + + @Override + public int getNumClusterNodes() { + return 10; + } + + @Override + public int getDAGAttemptNumber() { + return 1; + } + + @Override + public int getVertexNumTasks(String vertexName) { + throw new UnsupportedOperationException("getVertexNumTasks not implemented in this mock"); + } + + @Override + public void registerForVertexStateUpdates(String vertexName, Set stateSet) { + throw new UnsupportedOperationException("getVertexNumTasks not implemented in this mock"); + } + + @Override + public UserPayload getUserPayload() { + throw new UnsupportedOperationException("getUserPayload not implemented in this mock"); + } + + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/48208dc8/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java index cdf1ee4..3772cde 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java @@ -29,17 +29,12 @@ import static org.junit.Assert.assertTrue; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.EnumSet; import java.util.List; -import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.tez.dag.api.event.VertexState; -import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.mapreduce.TezTestUtils; import org.apache.tez.mapreduce.hadoop.MRInputHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.lib.MRInputUtils; @@ -75,7 +70,7 @@ public class TestMRInputSplitDistributor { UserPayload userPayload = UserPayload.create(payloadProto.build().toByteString().asReadOnlyByteBuffer()); - InputInitializerContext context = new TezRootInputInitializerContextForTest(userPayload); + InputInitializerContext context = new TezTestUtils.TezRootInputInitializerContextForTest(userPayload); MRInputSplitDistributor splitDist = new MRInputSplitDistributor(context); List events = splitDist.initialize(); @@ -124,7 +119,7 @@ public class TestMRInputSplitDistributor { UserPayload userPayload = UserPayload.create(payloadProto.build().toByteString().asReadOnlyByteBuffer()); - InputInitializerContext context = new TezRootInputInitializerContextForTest(userPayload); + InputInitializerContext context = new TezTestUtils.TezRootInputInitializerContextForTest(userPayload); MRInputSplitDistributor splitDist = new MRInputSplitDistributor(context); List events = splitDist.initialize(); @@ -150,79 +145,6 @@ public class TestMRInputSplitDistributor { assertEquals(2, ((InputSplitForTest) diEvent2.getDeserializedUserPayload()).identifier); } - private static class TezRootInputInitializerContextForTest implements - InputInitializerContext { - - private final ApplicationId appId; - private final UserPayload payload; - - TezRootInputInitializerContextForTest(UserPayload payload) throws IOException { - appId = ApplicationId.newInstance(1000, 200); - this.payload = payload == null ? UserPayload.create(null) : payload; - } - - @Override - public ApplicationId getApplicationId() { - return appId; - } - - @Override - public String getDAGName() { - return "FakeDAG"; - } - - @Override - public String getInputName() { - return "MRInput"; - } - - @Override - public UserPayload getInputUserPayload() { - return payload; - } - - @Override - public int getNumTasks() { - return 100; - } - - @Override - public Resource getVertexTaskResource() { - return Resource.newInstance(1024, 1); - } - - @Override - public Resource getTotalAvailableResource() { - return Resource.newInstance(10240, 10); - } - - @Override - public int getNumClusterNodes() { - return 10; - } - - @Override - public int getDAGAttemptNumber() { - return 1; - } - - @Override - public int getVertexNumTasks(String vertexName) { - throw new UnsupportedOperationException("getVertexNumTasks not implemented in this mock"); - } - - @Override - public void registerForVertexStateUpdates(String vertexName, Set stateSet) { - throw new UnsupportedOperationException("getVertexNumTasks not implemented in this mock"); - } - - @Override - public UserPayload getUserPayload() { - throw new UnsupportedOperationException("getUserPayload not implemented in this mock"); - } - - } - @Private private static class InputSplitForTest implements InputSplit {