From commits-return-264-archive-asf-public=cust-asf.ponee.io@nemo.apache.org Tue Aug 7 08:07:01 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 37723180657 for ; Tue, 7 Aug 2018 08:07:00 +0200 (CEST) Received: (qmail 12956 invoked by uid 500); 7 Aug 2018 06:06:59 -0000 Mailing-List: contact commits-help@nemo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nemo.apache.org Delivered-To: mailing list commits@nemo.apache.org Received: (qmail 12934 invoked by uid 99); 7 Aug 2018 06:06:59 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Aug 2018 06:06:59 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 6114182298; Tue, 7 Aug 2018 06:06:58 +0000 (UTC) Date: Tue, 07 Aug 2018 06:06:58 +0000 To: "commits@nemo.apache.org" Subject: [incubator-nemo] branch master updated: [NEMO-151] Add OutputWriters for addtional tagged outputs (#87) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <153362201832.24638.9208752765651702804@gitbox.apache.org> From: jangho@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-nemo X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 6d35ddd3c9922fb679541d7f2ff738d8c5b85438 X-Git-Newrev: 16c5e736bdc9184fb601d01853d8660e22e9e84e X-Git-Rev: 16c5e736bdc9184fb601d01853d8660e22e9e84e X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. jangho pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git The following commit(s) were added to refs/heads/master by this push: new 16c5e73 [NEMO-151] Add OutputWriters for addtional tagged outputs (#87) 16c5e73 is described below commit 16c5e736bdc9184fb601d01853d8660e22e9e84e Author: Seonghyun Park AuthorDate: Tue Aug 7 15:06:56 2018 +0900 [NEMO-151] Add OutputWriters for addtional tagged outputs (#87) JIRA: [NEMO-151: Add OutputWriters for addtional tagged outputs](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-151) **Major changes:** - Now AdditionalOutputTagProperty is edge execution property rather than vertex execution property. - Support inter-task writes for additional tagged outputs. **Minor changes to note:** - Added some comments regarding VertexHarness. - Fix a trivial bug in TaskExecutor#handleDataFetchers when fetching from multiple parents. **Tests for the changes:** - Add integration test for PartitionWordsByLengthITCase with Large shuffle policy **Other comments:** - N/A resolves [NEMO-151](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-151) --- .../main/java/edu/snu/nemo/common/ContextImpl.java | 4 +- .../AdditionalOutputTagProperty.java} | 21 ++- .../nemo/common/ir/vertex/transform/Transform.java | 2 +- .../frontend/beam/NemoPipelineVisitor.java | 32 ++--- .../reshaping/LargeShuffleRelayReshapingPass.java | 1 + .../nemo/examples/beam/PartitionWordsByLength.java | 58 ++++++--- .../beam/PartitionWordsByLengthITCase.java | 23 +++- examples/resources/expected_output_tag_long | 13 +- examples/resources/expected_output_tag_short | 18 ++- examples/resources/expected_output_tag_very_long | 4 +- .../resources/expected_output_tag_very_very_long | 2 + examples/resources/test_input_tag | 15 ++- .../runtime/common/plan/PhysicalPlanGenerator.java | 2 - .../runtime/common/plan/StagePartitionerTest.java | 1 - .../nemo/runtime/executor/task/TaskExecutor.java | 142 ++++++++++++++++++--- .../nemo/runtime/executor/task/VertexHarness.java | 22 +++- .../runtime/executor/task/TaskExecutorTest.java | 47 ++++--- 17 files changed, 286 insertions(+), 121 deletions(-) diff --git a/common/src/main/java/edu/snu/nemo/common/ContextImpl.java b/common/src/main/java/edu/snu/nemo/common/ContextImpl.java index d1774d3..bfe8c06 100644 --- a/common/src/main/java/edu/snu/nemo/common/ContextImpl.java +++ b/common/src/main/java/edu/snu/nemo/common/ContextImpl.java @@ -25,7 +25,7 @@ import java.util.Optional; */ public final class ContextImpl implements Transform.Context { private final Map sideInputs; - private final Map additionalTagOutputs; + private final Map additionalTagOutputs; private String data; /** @@ -44,7 +44,7 @@ public final class ContextImpl implements Transform.Context { } @Override - public Map getAdditionalTagOutputs() { + public Map getAdditionalTagOutputs() { return this.additionalTagOutputs; } diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/AdditionalTagOutputProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/AdditionalOutputTagProperty.java similarity index 54% rename from common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/AdditionalTagOutputProperty.java rename to common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/AdditionalOutputTagProperty.java index dd99100..db1b371 100644 --- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/AdditionalTagOutputProperty.java +++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/AdditionalOutputTagProperty.java @@ -13,30 +13,29 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.common.ir.vertex.executionproperty; +package edu.snu.nemo.common.ir.edge.executionproperty; -import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty; - -import java.util.HashMap; +import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty; /** - * AdditionalOutput Execution Property for vertex that outputs additional outputs. + * Additional Output Tag Execution Property for edge that contains tag for additional outputs. */ -public final class AdditionalTagOutputProperty extends VertexExecutionProperty> { +public final class AdditionalOutputTagProperty extends EdgeExecutionProperty { + /** * Constructor. - * @param value map of tag to IRVertex id. + * @param value tag id of additional input. */ - private AdditionalTagOutputProperty(final HashMap value) { + private AdditionalOutputTagProperty(final String value) { super(value); } /** * Static method exposing constructor. - * @param value map of tag to IRVertex id. + * @param value tag id of additional input. * @return the newly created execution property. */ - public static AdditionalTagOutputProperty of(final HashMap value) { - return new AdditionalTagOutputProperty(value); + public static AdditionalOutputTagProperty of(final String value) { + return new AdditionalOutputTagProperty(value); } } diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java index d41f9ea..871d08b 100644 --- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java +++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java @@ -61,7 +61,7 @@ public interface Transform extends Serializable { * @return sideInputs. */ Map getSideInputs(); - Map getAdditionalTagOutputs(); + Map getAdditionalTagOutputs(); /** * Put serialized data to send to the executor. diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java index ecf6c79..35370b7 100644 --- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java +++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java @@ -16,10 +16,7 @@ package edu.snu.nemo.compiler.frontend.beam; import edu.snu.nemo.common.Pair; -import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty; -import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty; -import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty; -import edu.snu.nemo.common.ir.vertex.executionproperty.AdditionalTagOutputProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.*; import edu.snu.nemo.common.ir.vertex.transform.Transform; import edu.snu.nemo.compiler.frontend.beam.coder.BeamDecoderFactory; import edu.snu.nemo.compiler.frontend.beam.coder.BeamEncoderFactory; @@ -29,7 +26,6 @@ import edu.snu.nemo.compiler.frontend.beam.source.BeamBoundedSourceVertex; import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.common.ir.vertex.LoopVertex; import edu.snu.nemo.common.ir.vertex.OperatorVertex; -import edu.snu.nemo.common.ir.edge.executionproperty.KeyExtractorProperty; import edu.snu.nemo.compiler.frontend.beam.transform.*; @@ -110,29 +106,18 @@ public final class NemoPipelineVisitor extends Pipeline.PipelineVisitor.Defaults beamNode.getInputs().values().stream().filter(pValueToVertex::containsKey) .forEach(pValue -> { + final boolean isAdditionalOutput = pValueToTag.containsKey(pValue); final IRVertex src = pValueToVertex.get(pValue); - final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, irVertex), src, irVertex); + final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, irVertex), src, irVertex, false); final Pair coderPair = pValueToCoder.get(pValue); edge.setProperty(EncoderProperty.of(coderPair.left())); edge.setProperty(DecoderProperty.of(coderPair.right())); edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor())); - this.builder.connectVertices(edge); - }); - - // This exclusively updates execution property of vertices with additional tagged outputs. - beamNode.getInputs().values().stream().filter(pValueToTag::containsKey) - .forEach(pValue -> { - final IRVertex src = pValueToVertex.get(pValue); - final TupleTag tag = pValueToTag.get(pValue); - final HashMap tagToVertex = new HashMap<>(); - tagToVertex.put(tag.getId(), irVertex.getId()); - if (!src.getPropertyValue(AdditionalTagOutputProperty.class).isPresent()) { - src.setProperty(AdditionalTagOutputProperty.of(tagToVertex)); - } else { - final HashMap prev = src.getPropertyValue(AdditionalTagOutputProperty.class).get(); - prev.putAll(tagToVertex); - src.setProperty(AdditionalTagOutputProperty.of(prev)); + // Apply AdditionalOutputTatProperty to edges that corresponds to additional outputs. + if (isAdditionalOutput) { + edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(pValue).getId())); } + this.builder.connectVertices(edge); }); } @@ -235,8 +220,7 @@ public final class NemoPipelineVisitor extends Pipeline.PipelineVisitor.Defaults sideInputs.stream().filter(pValueToVertex::containsKey) .forEach(pValue -> { final IRVertex src = pValueToVertex.get(pValue); - final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, irVertex), - src, irVertex, true); + final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, irVertex), src, irVertex, true); final Pair coder = pValueToCoder.get(pValue); edge.setProperty(EncoderProperty.of(coder.left())); edge.setProperty(DecoderProperty.of(coder.right())); diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java index dcbece7..0e476ff 100644 --- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java @@ -60,6 +60,7 @@ public final class LargeShuffleRelayReshapingPass extends ReshapingPass { // before the vertex receiving shuffled data. final OperatorVertex iFileMergerVertex = new OperatorVertex(new RelayTransform()); iFileMergerVertex.getExecutionProperties().put(SkipSerDesProperty.of()); + builder.addVertex(iFileMergerVertex); final IREdge newEdgeToMerger = new IREdge(CommunicationPatternProperty.Value.Shuffle, edge.getSrc(), iFileMergerVertex, edge.isSideInput()); diff --git a/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PartitionWordsByLength.java b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PartitionWordsByLength.java index 5613e3c..e97b33b 100644 --- a/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PartitionWordsByLength.java +++ b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PartitionWordsByLength.java @@ -20,10 +20,7 @@ import edu.snu.nemo.compiler.frontend.beam.NemoPipelineRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.FlatMapElements; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.*; import org.apache.beam.sdk.values.*; import java.util.Arrays; @@ -52,11 +49,13 @@ public final class PartitionWordsByLength { // {} here is required for preserving type information. // Please see https://stackoverflow.com/a/48431397 for details. - final TupleTag shortWordsTag = new TupleTag() { + final TupleTag> shortWordsTag = new TupleTag>("short") { }; - final TupleTag longWordsTag = new TupleTag() { + final TupleTag> longWordsTag = new TupleTag>("long") { }; - final TupleTag veryLongWordsTag = new TupleTag() { + final TupleTag veryLongWordsTag = new TupleTag("very long") { + }; + final TupleTag veryVeryLongWordsTag = new TupleTag("very very long") { }; final Pipeline p = Pipeline.create(options); @@ -70,27 +69,48 @@ public final class PartitionWordsByLength { @ProcessElement public void processElement(final ProcessContext c) { String word = c.element(); - if (word.length() < 5) { - c.output(shortWordsTag, word); - } else if (word.length() < 8) { - c.output(longWordsTag, word.length()); + if (word.length() < 6) { + c.output(shortWordsTag, KV.of(word.length(), word)); + } else if (word.length() < 11) { + c.output(longWordsTag, KV.of(word.length(), word)); + } else if (word.length() > 12) { + c.output(veryVeryLongWordsTag, word); } else { - c.output(veryLongWordsTag, word); + c.output(word); } } }).withOutputTags(veryLongWordsTag, TupleTagList - .of(longWordsTag) - .and(shortWordsTag))); + .of(shortWordsTag).and(longWordsTag).and(veryVeryLongWordsTag))); - PCollection shortWords = results.get(shortWordsTag); - PCollection longWordLengths = results - .get(longWordsTag) - .apply(MapElements.into(TypeDescriptors.strings()).via(i -> Integer.toString(i))); + PCollection shortWords = results.get(shortWordsTag) + .apply(GroupByKey.create()) + .apply(MapElements.via(new FormatLines())); + PCollection longWords = results.get(longWordsTag) + .apply(GroupByKey.create()) + .apply(MapElements.via(new FormatLines())); PCollection veryLongWords = results.get(veryLongWordsTag); + PCollection veryveryLongWords = results.get(veryVeryLongWordsTag); GenericSourceSink.write(shortWords, outputFilePath + "_short"); - GenericSourceSink.write(longWordLengths, outputFilePath + "_long"); + GenericSourceSink.write(longWords, outputFilePath + "_long"); GenericSourceSink.write(veryLongWords, outputFilePath + "_very_long"); + GenericSourceSink.write(veryveryLongWords, outputFilePath + "_very_very_long"); p.run(); } + + /** + * Formats a key-value pair to a string. + */ + static class FormatLines extends SimpleFunction>, String> { + @Override + public String apply(final KV> input) { + final int length = input.getKey(); + final StringBuilder sb = new StringBuilder(); + for (final String word : input.getValue()) { + sb.append(length).append(": ").append(word).append('\n'); + } + + return sb.toString(); + } + } } diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PartitionWordsByLengthITCase.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PartitionWordsByLengthITCase.java index 96c83a7..bcaf01f 100644 --- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PartitionWordsByLengthITCase.java +++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PartitionWordsByLengthITCase.java @@ -19,6 +19,7 @@ import edu.snu.nemo.client.JobLauncher; import edu.snu.nemo.common.test.ArgBuilder; import edu.snu.nemo.common.test.ExampleTestUtil; import edu.snu.nemo.examples.beam.policy.DefaultPolicyParallelismFive; +import edu.snu.nemo.examples.beam.policy.LargeShufflePolicyParallelismFive; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -46,9 +47,8 @@ public final class PartitionWordsByLengthITCase { @Before public void setUp() throws Exception { builder = new ArgBuilder() - .addResourceJson(executorResourceFileName) - .addUserMain(PartitionWordsByLength.class.getCanonicalName()) - .addUserArgs(inputFilePath, outputFilePath); + .addUserMain(PartitionWordsByLength.class.getCanonicalName()) + .addUserArgs(inputFilePath, outputFilePath); } @After @@ -57,6 +57,7 @@ public final class PartitionWordsByLengthITCase { ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName + "_short", expectedOutputFileName + "_short"); ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName + "_long", expectedOutputFileName + "_long"); ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName + "_very_long", expectedOutputFileName + "_very_long"); + ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName + "_very_very_long", expectedOutputFileName + "_very_very_long"); } finally { ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName); } @@ -65,8 +66,18 @@ public final class PartitionWordsByLengthITCase { @Test (timeout = TIMEOUT) public void test() throws Exception { JobLauncher.main(builder - .addJobId(PartitionWordsByLength.class.getSimpleName()) - .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName()) - .build()); + .addResourceJson(executorResourceFileName) + .addJobId(PartitionWordsByLengthITCase.class.getSimpleName()) + .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName()) + .build()); + } + + @Test (timeout = TIMEOUT) + public void testSailfish() throws Exception { + JobLauncher.main(builder + .addResourceJson(executorResourceFileName) + .addJobId(PartitionWordsByLengthITCase.class.getSimpleName() + "_sailfish") + .addOptimizationPolicy(LargeShufflePolicyParallelismFive.class.getCanonicalName()) + .build()); } } diff --git a/examples/resources/expected_output_tag_long b/examples/resources/expected_output_tag_long index 91dea2c..78c88bf 100644 --- a/examples/resources/expected_output_tag_long +++ b/examples/resources/expected_output_tag_long @@ -1,2 +1,11 @@ -6 -6 +6: foobar +6: barbaz + +7: abcdefg + +8: fooipsum + +9: foobarbaz + +10: ipsumlorem + diff --git a/examples/resources/expected_output_tag_short b/examples/resources/expected_output_tag_short index 72594ed..d9d497b 100644 --- a/examples/resources/expected_output_tag_short +++ b/examples/resources/expected_output_tag_short @@ -1,3 +1,15 @@ -foo -bar -qux +1: a + +2: to + +3: foo +3: bar +3: qux + +4: that +4: this + +5: ipsum +5: dolor +5: loren + diff --git a/examples/resources/expected_output_tag_very_long b/examples/resources/expected_output_tag_very_long index 22a2815..f562b43 100644 --- a/examples/resources/expected_output_tag_very_long +++ b/examples/resources/expected_output_tag_very_long @@ -1,2 +1,2 @@ -foobarbaz -ipsumlorem +foobarbazqux +bazquxfoobar diff --git a/examples/resources/expected_output_tag_very_very_long b/examples/resources/expected_output_tag_very_very_long new file mode 100644 index 0000000..5fa149a --- /dev/null +++ b/examples/resources/expected_output_tag_very_very_long @@ -0,0 +1,2 @@ +bazquxfoobarfoobaz +bazquxfoobarfoobar diff --git a/examples/resources/test_input_tag b/examples/resources/test_input_tag index 0cd417b..64934b8 100644 --- a/examples/resources/test_input_tag +++ b/examples/resources/test_input_tag @@ -1,7 +1,20 @@ +a +to foo bar +that +ipsum +dolor foobar barbaz +abcdefg +fooipsum foobarbaz +bazquxfoobarfoobaz +loren +this +foobarbazqux +bazquxfoobar +bazquxfoobarfoobar +qux ipsumlorem -qux \ No newline at end of file diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java index fa95926..364a938 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java +++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java @@ -25,7 +25,6 @@ import edu.snu.nemo.common.ir.vertex.*; import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty; import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty; -import edu.snu.nemo.common.ir.vertex.executionproperty.AdditionalTagOutputProperty; import edu.snu.nemo.conf.JobConf; import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.dag.DAGBuilder; @@ -59,7 +58,6 @@ public final class PhysicalPlanGenerator implements Function additionalOutputMap = irVertex - .getPropertyValue(AdditionalTagOutputProperty.class).orElse(new HashMap<>()); + final Map additionalOutputMap = + getAdditionalOutputMap(irVertex, task.getTaskOutgoingEdges(), irVertexDag); final List isToAdditionalTagOutputs = children.stream() .map(harness -> harness.getIRVertex().getId()) .map(additionalOutputMap::containsValue) .collect(Collectors.toList()); // Handle writes - final List childrenTaskWriters = getChildrenTaskWriters( - taskIndex, irVertex, task.getTaskOutgoingEdges(), dataTransferFactory); // Children-task write + // Main output children task writes + final List mainChildrenTaskWriters = getMainChildrenTaskWriters( + taskIndex, irVertex, task.getTaskOutgoingEdges(), dataTransferFactory, additionalOutputMap); + // Additional output children task writes + final Map additionalChildrenTaskWriters = getAdditionalChildrenTaskWriters( + taskIndex, irVertex, task.getTaskOutgoingEdges(), dataTransferFactory, additionalOutputMap); final List additionalOutputVertices = new ArrayList<>(additionalOutputMap.values()); final OutputCollectorImpl oci = new OutputCollectorImpl(additionalOutputVertices); + // intra-vertex writes final VertexHarness vertexHarness = new VertexHarness(irVertex, oci, children, - isToSideInputs, isToAdditionalTagOutputs, - childrenTaskWriters, new ContextImpl(sideInputMap, additionalOutputMap)); // Intra-vertex write + isToSideInputs, isToAdditionalTagOutputs, mainChildrenTaskWriters, additionalChildrenTaskWriters, + new ContextImpl(sideInputMap, additionalOutputMap)); prepareTransform(vertexHarness); vertexIdToHarness.put(irVertex.getId(), vertexHarness); @@ -222,10 +227,11 @@ public final class TaskExecutor { } // Recursively process all of the additional output elements. - vertexHarness.getAdditionalTagOutputChildren().keySet().forEach(tag -> { - while (!outputCollector.isEmpty(tag)) { - final Object element = outputCollector.remove(tag); - handleAdditionalOutputElement(vertexHarness, element, tag); // Recursion + vertexHarness.getContext().getAdditionalTagOutputs().values().forEach(value -> { + final String dstVertexId = (String) value; + while (!outputCollector.isEmpty(dstVertexId)) { + final Object element = outputCollector.remove(dstVertexId); + handleAdditionalOutputElement(vertexHarness, element, dstVertexId); // Recursion } }); } @@ -317,28 +323,49 @@ public final class TaskExecutor { private void finalizeVertex(final VertexHarness vertexHarness) { closeTransform(vertexHarness); - while (!vertexHarness.getOutputCollector().isEmpty()) { - final Object element = vertexHarness.getOutputCollector().remove(); + final OutputCollectorImpl outputCollector = vertexHarness.getOutputCollector(); + + // handle main outputs + while (!outputCollector.isEmpty()) { + final Object element = outputCollector.remove(); handleMainOutputElement(vertexHarness, element); } + + // handle additional tagged outputs + vertexHarness.getAdditionalTagOutputChildren().keySet().forEach(tag -> { + while (!outputCollector.isEmpty(tag)) { + final Object element = outputCollector.remove(tag); + handleAdditionalOutputElement(vertexHarness, element, tag); + } + }); finalizeOutputWriters(vertexHarness); } private void handleMainOutputElement(final VertexHarness harness, final Object element) { - harness.getWritersToChildrenTasks().forEach(outputWriter -> { + // writes to children tasks + harness.getWritersToMainChildrenTasks().forEach(outputWriter -> { outputWriter.write(element); }); + // writes to side input children tasks if (harness.getSideInputChildren().size() > 0) { sideInputMap.put(((OperatorVertex) harness.getIRVertex()).getTransform().getTag(), element); } + // process elements in the next vertices within a task harness.getNonSideInputChildren().forEach(child -> processElementRecursively(child, element)); } private void handleAdditionalOutputElement(final VertexHarness harness, final Object element, final String tag) { - // Inter-task writes are currently not supported. + // writes to additional children tasks + harness.getWritersToAdditionalChildrenTasks().entrySet().stream() + .filter(kv -> kv.getKey().equals(tag)) + .forEach(kv -> { + kv.getValue().write(element); + }); + // writes to side input children tasks if (harness.getSideInputChildren().size() > 0) { sideInputMap.put(((OperatorVertex) harness.getIRVertex()).getTransform().getTag(), element); } + // process elements in the next vertices within a task harness.getAdditionalTagOutputChildren().entrySet().stream() .filter(kv -> kv.getKey().equals(tag)) .forEach(kv -> processElementRecursively(kv.getValue(), element)); @@ -354,7 +381,7 @@ public final class TaskExecutor { // For this looping of available fetchers. int finishedFetcherIndex = NONE_FINISHED; for (int i = 0; i < availableFetchers.size(); i++) { - final DataFetcher dataFetcher = fetchers.get(i); + final DataFetcher dataFetcher = availableFetchers.get(i); final Object element; try { element = dataFetcher.fetchDataElement(); @@ -393,6 +420,31 @@ public final class TaskExecutor { ////////////////////////////////////////////// Helper methods for setting up initial data structures + private Map getAdditionalOutputMap(final IRVertex irVertex, + final List outEdgesToChildrenTasks, + final DAG> irVertexDag) { + final Map additionalOutputMap = new HashMap<>(); + + // Add all intra-task additional tags to additional output map. + irVertexDag.getOutgoingEdgesOf(irVertex.getId()) + .stream() + .filter(edge -> edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent()) + .map(edge -> + Pair.of(edge.getPropertyValue(AdditionalOutputTagProperty.class).get(), edge.getDst().getId())) + .forEach(pair -> additionalOutputMap.put(pair.left(), pair.right())); + + // Add all inter-task additional tags to additional output map. + outEdgesToChildrenTasks + .stream() + .filter(edge -> edge.getSrcIRVertex().getId().equals(irVertex.getId())) + .filter(edge -> edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent()) + .map(edge -> + Pair.of(edge.getPropertyValue(AdditionalOutputTagProperty.class).get(), edge.getDstIRVertex().getId())) + .forEach(pair -> additionalOutputMap.put(pair.left(), pair.right())); + + return additionalOutputMap; + } + private Optional getSourceVertexReader(final IRVertex irVertex, final Map irVertexIdToReadable) { if (irVertex instanceof SourceVertex) { @@ -418,18 +470,58 @@ public final class TaskExecutor { .collect(Collectors.toList()); } - private List getChildrenTaskWriters(final int taskIndex, - final IRVertex irVertex, - final List outEdgesToChildrenTasks, - final DataTransferFactory dataTransferFactory) { + /** + * Return inter-task OutputWriters, for single output or output associated with main tag. + * @param taskIndex current task index + * @param irVertex source irVertex + * @param outEdgesToChildrenTasks outgoing edges to child tasks + * @param dataTransferFactory dataTransferFactory + * @param taggedOutputs tag to vertex id map + * @return OutputWriters for main children tasks + */ + private List getMainChildrenTaskWriters(final int taskIndex, + final IRVertex irVertex, + final List outEdgesToChildrenTasks, + final DataTransferFactory dataTransferFactory, + final Map taggedOutputs) { return outEdgesToChildrenTasks .stream() .filter(outEdge -> outEdge.getSrcIRVertex().getId().equals(irVertex.getId())) + .filter(outEdge -> !taggedOutputs.containsValue(outEdge.getDstIRVertex().getId())) .map(outEdgeForThisVertex -> dataTransferFactory .createWriter(irVertex, taskIndex, outEdgeForThisVertex.getDstIRVertex(), outEdgeForThisVertex)) .collect(Collectors.toList()); } + /** + * Return inter-task OutputWriters associated with additional output tags. + * @param taskIndex current task index + * @param irVertex source irVertex + * @param outEdgesToChildrenTasks outgoing edges to child tasks + * @param dataTransferFactory dataTransferFactory + * @param taggedOutputs tag to vertex id map + * @return additional children vertex id to OutputWriters map. + */ + private Map getAdditionalChildrenTaskWriters(final int taskIndex, + final IRVertex irVertex, + final List outEdgesToChildrenTasks, + final DataTransferFactory dataTransferFactory, + final Map taggedOutputs) { + final Map additionalChildrenTaskWriters = new HashMap<>(); + + outEdgesToChildrenTasks + .stream() + .filter(outEdge -> outEdge.getSrcIRVertex().getId().equals(irVertex.getId())) + .filter(outEdge -> taggedOutputs.containsValue(outEdge.getDstIRVertex().getId())) + .forEach(outEdgeForThisVertex -> { + additionalChildrenTaskWriters.put(outEdgeForThisVertex.getDstIRVertex().getId(), + dataTransferFactory.createWriter(irVertex, taskIndex, outEdgeForThisVertex.getDstIRVertex(), + outEdgeForThisVertex)); + }); + + return additionalChildrenTaskWriters; + } + private List getChildrenHarnesses(final IRVertex irVertex, final DAG> irVertexDag, final Map vertexIdToHarness) { @@ -486,7 +578,15 @@ public final class TaskExecutor { private void finalizeOutputWriters(final VertexHarness vertexHarness) { final List writtenBytesList = new ArrayList<>(); - vertexHarness.getWritersToChildrenTasks().forEach(outputWriter -> { + // finalize OutputWriters for main children + vertexHarness.getWritersToMainChildrenTasks().forEach(outputWriter -> { + outputWriter.close(); + final Optional writtenBytes = outputWriter.getWrittenBytes(); + writtenBytes.ifPresent(writtenBytesList::add); + }); + + // finalize OutputWriters for additional tagged children + vertexHarness.getWritersToAdditionalChildrenTasks().values().forEach(outputWriter -> { outputWriter.close(); final Optional writtenBytes = outputWriter.getWrittenBytes(); writtenBytes.ifPresent(writtenBytesList::add); diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java index c5f9a78..c79b530 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java @@ -38,14 +38,16 @@ final class VertexHarness { private final List sideInputChildren; private final List nonSideInputChildren; private final Map additionalTagOutputChildren; - private final List writersToChildrenTasks; + private final List writersToMainChildrenTasks; + private final Map writersToAdditionalChildrenTasks; VertexHarness(final IRVertex irVertex, final OutputCollectorImpl outputCollector, final List children, final List isSideInputs, final List isAdditionalTagOutputs, - final List writersToChildrenTasks, + final List writersToMainChildrenTasks, + final Map writersToAdditionalChildrenTasks, final Transform.Context context) { this.irVertex = irVertex; this.outputCollector = outputCollector; @@ -71,7 +73,8 @@ final class VertexHarness { this.sideInputChildren = sides; this.nonSideInputChildren = nonSides; this.additionalTagOutputChildren = tagged; - this.writersToChildrenTasks = writersToChildrenTasks; + this.writersToMainChildrenTasks = writersToMainChildrenTasks; + this.writersToAdditionalChildrenTasks = writersToAdditionalChildrenTasks; this.context = context; } @@ -111,10 +114,17 @@ final class VertexHarness { } /** - * @return OutputWriters of this irVertex. (empty if none exists) + * @return OutputWriters for main outputs of this irVertex. (empty if none exists) */ - List getWritersToChildrenTasks() { - return writersToChildrenTasks; + List getWritersToMainChildrenTasks() { + return writersToMainChildrenTasks; + } + + /** + * @return OutputWriters for additional tagged outputs of this irVertex. (empty if none exists) + */ + Map getWritersToAdditionalChildrenTasks() { + return writersToAdditionalChildrenTasks; } /** diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java index 51e6c83..e2fea6c 100644 --- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java +++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java @@ -20,16 +20,19 @@ import edu.snu.nemo.common.ir.OutputCollector; import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.dag.DAGBuilder; import edu.snu.nemo.common.ir.Readable; +import edu.snu.nemo.common.ir.edge.IREdge; +import edu.snu.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty; import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty; import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty; import edu.snu.nemo.common.ir.vertex.InMemorySourceVertex; import edu.snu.nemo.common.ir.vertex.OperatorVertex; -import edu.snu.nemo.common.ir.vertex.executionproperty.AdditionalTagOutputProperty; import edu.snu.nemo.common.ir.vertex.transform.Transform; import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap; import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.runtime.common.RuntimeIdGenerator; import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap; +import edu.snu.nemo.runtime.common.plan.Stage; import edu.snu.nemo.runtime.common.plan.Task; import edu.snu.nemo.runtime.common.plan.StageEdge; import edu.snu.nemo.runtime.common.plan.RuntimeEdge; @@ -54,8 +57,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; @@ -67,7 +68,7 @@ import static org.mockito.Mockito.*; */ @RunWith(PowerMockRunner.class) @PrepareForTest({InputReader.class, OutputWriter.class, DataTransferFactory.class, - TaskStateManager.class, StageEdge.class, PersistentConnectionToMasterMap.class}) + TaskStateManager.class, StageEdge.class, PersistentConnectionToMasterMap.class, Stage.class, IREdge.class}) public final class TaskExecutorTest { private static final int DATA_SIZE = 100; private static final ExecutionPropertyMap TASK_EXECUTION_PROPERTY_MAP @@ -278,21 +279,21 @@ public final class TaskExecutorTest { final IRVertex bonusVertex1 = new OperatorVertex(new RelayTransform()); final IRVertex bonusVertex2 = new OperatorVertex(new RelayTransform()); - // Tag to vertex map. Mock tags are used. - HashMap tagToVertex = new HashMap<>(); - tagToVertex.put("bonus1", bonusVertex1.getId()); - tagToVertex.put("bonus2", bonusVertex2.getId()); + final RuntimeEdge edge1 = createEdge(routerVertex, mainVertex, false, "edge-1"); + final RuntimeEdge edge2 = createEdge(routerVertex, bonusVertex1, false, "edge-2"); + final RuntimeEdge edge3 = createEdge(routerVertex, bonusVertex2, false, "edge-3"); - routerVertex.setProperty(AdditionalTagOutputProperty.of(tagToVertex)); + edge2.getExecutionProperties().put(AdditionalOutputTagProperty.of("bonus1")); + edge3.getExecutionProperties().put(AdditionalOutputTagProperty.of("bonus2")); final DAG> taskDag = new DAGBuilder>() .addVertex(routerVertex) .addVertex(mainVertex) .addVertex(bonusVertex1) .addVertex(bonusVertex2) - .connectVertices(createEdge(routerVertex, mainVertex, false, "edge-1")) - .connectVertices(createEdge(routerVertex, bonusVertex1, false, "edge-2")) - .connectVertices(createEdge(routerVertex, bonusVertex2, false, "edge-3")) + .connectVertices(edge1) + .connectVertices(edge2) + .connectVertices(edge3) .buildWithoutSourceSinkCheck(); final Task task = new Task( @@ -344,17 +345,23 @@ public final class TaskExecutorTest { } private StageEdge mockStageEdgeFrom(final IRVertex irVertex) { - final StageEdge edge = mock(StageEdge.class); - when(edge.getSrcIRVertex()).thenReturn(irVertex); - when(edge.getDstIRVertex()).thenReturn(new OperatorVertex(new RelayTransform())); - return edge; + return new StageEdge("runtime incoming edge id", + ExecutionPropertyMap.of(mock(IREdge.class), CommunicationPatternProperty.Value.OneToOne), + irVertex, + new OperatorVertex(new RelayTransform()), + mock(Stage.class), + mock(Stage.class), + false); } private StageEdge mockStageEdgeTo(final IRVertex irVertex) { - final StageEdge edge = mock(StageEdge.class); - when(edge.getSrcIRVertex()).thenReturn(new OperatorVertex(new RelayTransform())); - when(edge.getDstIRVertex()).thenReturn(irVertex); - return edge; + return new StageEdge("runtime outgoing edge id", + ExecutionPropertyMap.of(mock(IREdge.class), CommunicationPatternProperty.Value.OneToOne), + new OperatorVertex(new RelayTransform()), + irVertex, + mock(Stage.class), + mock(Stage.class), + false); } /**