From commits-return-285-archive-asf-public=cust-asf.ponee.io@nemo.apache.org Tue Aug 14 11:51:49 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id D004618067A for ; Tue, 14 Aug 2018 11:51:46 +0200 (CEST) Received: (qmail 62832 invoked by uid 500); 14 Aug 2018 09:51:46 -0000 Mailing-List: contact commits-help@nemo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nemo.apache.org Delivered-To: mailing list commits@nemo.apache.org Received: (qmail 62823 invoked by uid 99); 14 Aug 2018 09:51:45 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Aug 2018 09:51:45 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 28EE7853E2; Tue, 14 Aug 2018 09:51:45 +0000 (UTC) Date: Tue, 14 Aug 2018 09:51:45 +0000 To: "commits@nemo.apache.org" Subject: [incubator-nemo] branch master updated: [NEMO-64] Fix map stage hang under DataSkewPolicy (#99) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <153424030508.22444.16047886282426682549@gitbox.apache.org> From: johnyangk@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-nemo X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 1b9bfa070eb356ce079b9d32013249f9a18ac45e X-Git-Newrev: d59373f59a2b82a170fe581d944364a63e015365 X-Git-Rev: d59373f59a2b82a170fe581d944364a63e015365 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. johnyangk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git The following commit(s) were added to refs/heads/master by this push: new d59373f [NEMO-64] Fix map stage hang under DataSkewPolicy (#99) d59373f is described below commit d59373f59a2b82a170fe581d944364a63e015365 Author: Jeongyoon Eo AuthorDate: Tue Aug 14 18:51:42 2018 +0900 [NEMO-64] Fix map stage hang under DataSkewPolicy (#99) JIRA: [NEMO-64: Fix map stage hang under DataSkewPolicy](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-64), [NEMO-181: Fix DataSkewPolicy bug for multiple shuffles](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-181) **Major changes:** - Data for dynamic optimization is aggregated at `DynOptDataHandler` in Scheduler instead of a variable in `MetricCollectionBarrierVertex`. Updating `MetricCollectionBarrierVertex` as data arrives in RuntimeMaster clashed with serializing IRDAG for scheduled Tasks, which caused `ConcurrentModificationException` and silently killed the Scheduler - Identifies the target edge to optimize via MetricCollectionProperty in case multiple shuffles are involved. **Minor changes to note:** - Removed now unused entries in `dataSizeMetricMsg` - Added `DataSkewMetricProperty` and `MetricFactory` to make task hash ranges of shuffle edges as an execution property, so that RuntimePass can optimize the given IR DAG **Tests for the changes:** - N/A(No new features, covered by `PerKeyMedianITCase`) **Other comments:** - Data for dynamic optimization will be aggregated in designated vertex and not in the RuntimeMaster via upcoming PR for [NEMO-99](https://issues.apache.org/jira/browse/NEMO-99) resolves [NEMO-64](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-64), [NEMO-181](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-181) --- .../main/java/edu/snu/nemo/client/JobLauncher.java | 1 + .../edu/snu/nemo/common/DataSkewMetricFactory.java | 36 ++++++++ .../main/java/edu/snu/nemo/common}/HashRange.java | 21 +++-- .../main/java/edu/snu/nemo/common}/KeyRange.java | 2 +- .../java/edu/snu/nemo/common/MetricFactory.java | 26 ++++++ .../executionproperty/DataSkewMetricProperty.java | 43 +++++++++ .../edu/snu/nemo/common/ir/vertex/IRVertex.java | 9 ++ .../nemo/compiler/backend/nemo/NemoBackend.java | 2 +- .../compiletime/annotating/DefaultMetricPass.java | 58 ++++++++++++ .../annotating/SkewResourceSkewedDataPass.java | 10 +- .../composite/DefaultCompositePass.java | 1 + .../compiler/optimizer/policy/DataSkewPolicy.java | 8 -- .../optimizer/policy/PolicyBuilderTest.java | 6 +- conf/src/main/java/edu/snu/nemo/conf/JobConf.java | 1 - .../nemo/runtime/common/RuntimeIdGenerator.java | 3 +- .../eventhandler/DynamicOptimizationEvent.java | 31 ++++--- .../DynamicOptimizationEventHandler.java | 9 +- .../runtime/common/optimizer/RunTimeOptimizer.java | 40 +++++--- .../pass/runtime/DataSkewRuntimePass.java | 101 ++++++++++----------- .../common/optimizer/pass/runtime/RuntimePass.java | 7 +- .../snu/nemo/runtime/common/plan/PhysicalPlan.java | 11 +++ .../runtime/common/plan/PhysicalPlanGenerator.java | 13 ++- .../snu/nemo/runtime/common/plan/StageEdge.java | 31 ------- .../nemo/runtime/common/plan/StagePartitioner.java | 4 +- .../edu/snu/nemo/runtime/common/plan/Task.java | 4 +- runtime/common/src/main/proto/ControlMessage.proto | 2 - .../pass/runtime/DataSkewRuntimePassTest.java | 4 +- .../runtime/executor/data/BlockManagerWorker.java | 4 +- .../nemo/runtime/executor/data/block/Block.java | 2 +- .../runtime/executor/data/block/FileBlock.java | 2 +- .../data/block/NonSerializedMemoryBlock.java | 2 +- .../executor/data/block/SerializedMemoryBlock.java | 2 +- .../runtime/executor/datatransfer/InputReader.java | 14 +-- .../nemo/runtime/executor/data/BlockStoreTest.java | 4 +- .../snu/nemo/runtime/executor/data/BlockTest.java | 2 +- .../executor/datatransfer/DataTransferTest.java | 21 +++++ .../runtime/master/DataSkewDynOptDataHandler.java | 61 +++++++++++++ .../snu/nemo/runtime/master/DynOptDataHandler.java | 33 +++++++ .../edu/snu/nemo/runtime/master/RuntimeMaster.java | 49 +--------- .../master/resource/ExecutorRepresenter.java | 2 - .../runtime/master/scheduler/BatchScheduler.java | 79 +++++++++------- .../scheduler/NodeShareSchedulingConstraint.java | 2 +- .../scheduler/SchedulingConstraintRegistry.java | 2 +- .../SkewnessAwareSchedulingConstraint.java | 19 ++-- .../runtime/master/scheduler/TaskDispatcher.java | 1 - .../SkewnessAwareSchedulingConstraintTest.java | 59 +++++++----- .../runtime/common/plan/TestPlanGenerator.java | 2 +- 47 files changed, 558 insertions(+), 288 deletions(-) diff --git a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java index f0ae953..5950854 100644 --- a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java +++ b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java @@ -315,6 +315,7 @@ public final class JobLauncher { cl.registerShortNameOfClass(JobConf.PartitionTransportServerNumListeningThreads.class); cl.registerShortNameOfClass(JobConf.PartitionTransportServerNumWorkingThreads.class); cl.registerShortNameOfClass(JobConf.PartitionTransportClientNumThreads.class); + cl.registerShortNameOfClass(JobConf.MaxNumDownloadsForARuntimeEdge.class); cl.processCommandLine(args); return confBuilder.build(); } diff --git a/common/src/main/java/edu/snu/nemo/common/DataSkewMetricFactory.java b/common/src/main/java/edu/snu/nemo/common/DataSkewMetricFactory.java new file mode 100644 index 0000000..2847d8d --- /dev/null +++ b/common/src/main/java/edu/snu/nemo/common/DataSkewMetricFactory.java @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.common; + +import java.util.Map; + +/** + * A {@link MetricFactory} which is used for data skew handling. + */ +public final class DataSkewMetricFactory implements MetricFactory> { + private Map metric; + + /** + * Default constructor. + */ + public DataSkewMetricFactory(final Map metric) { + this.metric = metric; + } + + public Map getMetric() { + return metric; + } +} diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/HashRange.java b/common/src/main/java/edu/snu/nemo/common/HashRange.java similarity index 90% rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/HashRange.java rename to common/src/main/java/edu/snu/nemo/common/HashRange.java index 50e4334..cdc7d49 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/HashRange.java +++ b/common/src/main/java/edu/snu/nemo/common/HashRange.java @@ -13,7 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.runtime.common.data; +package edu.snu.nemo.common; + +import java.util.Arrays; /** * Descriptor for hash range. @@ -109,22 +111,23 @@ public final class HashRange implements KeyRange { return false; } final HashRange hashRange = (HashRange) o; - if (rangeBeginInclusive != hashRange.rangeBeginInclusive) { + if (rangeBeginInclusive != hashRange.rangeBeginInclusive + || rangeEndExclusive != hashRange.rangeEndExclusive + || isSkewed != hashRange.isSkewed) { return false; } - return rangeEndExclusive == hashRange.rangeEndExclusive; + return true; } @Override public int hashCode() { - int result = rangeBeginInclusive; - result = 31 * result + rangeEndExclusive; - return result; + return Arrays.hashCode(new Object[] { + rangeBeginInclusive, + rangeEndExclusive, + isSkewed, + }); } - public void setAsSkewed() { - isSkewed = true; - } public boolean isSkewed() { return isSkewed; } diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/KeyRange.java b/common/src/main/java/edu/snu/nemo/common/KeyRange.java similarity index 97% rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/KeyRange.java rename to common/src/main/java/edu/snu/nemo/common/KeyRange.java index 0c46fc5..6fcac36 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/KeyRange.java +++ b/common/src/main/java/edu/snu/nemo/common/KeyRange.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.runtime.common.data; +package edu.snu.nemo.common; import java.io.Serializable; diff --git a/common/src/main/java/edu/snu/nemo/common/MetricFactory.java b/common/src/main/java/edu/snu/nemo/common/MetricFactory.java new file mode 100644 index 0000000..a98b81b --- /dev/null +++ b/common/src/main/java/edu/snu/nemo/common/MetricFactory.java @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.common; + +import java.io.Serializable; + +/** + * A serializable metric factory. + * + * @param metric type. + */ +public interface MetricFactory extends Serializable { +} diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataSkewMetricProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataSkewMetricProperty.java new file mode 100644 index 0000000..a5f37a9 --- /dev/null +++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataSkewMetricProperty.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.common.ir.edge.executionproperty; + +import edu.snu.nemo.common.DataSkewMetricFactory; +import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty; + +/** + * DataSkewMetric ExecutionProperty. + */ +public final class DataSkewMetricProperty extends EdgeExecutionProperty { + /** + * Constructor. + * + * @param value value of the execution property. + */ + private DataSkewMetricProperty(final DataSkewMetricFactory value) { + super(value); + } + + /** + * Static method exposing the constructor. + * + * @param value value of the new execution property. + * @return the newly created execution property. + */ + public static DataSkewMetricProperty of(final DataSkewMetricFactory value) { + return new DataSkewMetricProperty(value); + } +} diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java index 67fc42d..09ebfbf 100644 --- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java +++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java @@ -29,6 +29,7 @@ import java.util.Optional; */ public abstract class IRVertex extends Vertex { private final ExecutionPropertyMap executionProperties; + private boolean stagePartitioned; /** * Constructor of IRVertex. @@ -36,6 +37,7 @@ public abstract class IRVertex extends Vertex { public IRVertex() { super(IdManager.newVertexId()); this.executionProperties = ExecutionPropertyMap.of(this); + this.stagePartitioned = false; } /** @@ -89,6 +91,13 @@ public abstract class IRVertex extends Vertex { return executionProperties; } + public final void setStagePartitioned() { + stagePartitioned = true; + } + public final boolean getStagePartitioned() { + return stagePartitioned; + } + /** * @return IRVertex properties in String form. */ diff --git a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java index 3e933a2..da7adb6 100644 --- a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java +++ b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java @@ -61,6 +61,6 @@ public final class NemoBackend implements Backend { public PhysicalPlan compile(final DAG irDAG, final PhysicalPlanGenerator physicalPlanGenerator) { final DAG stageDAG = physicalPlanGenerator.apply(irDAG); - return new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), stageDAG); + return new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), irDAG, stageDAG); } } diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultMetricPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultMetricPass.java new file mode 100644 index 0000000..c96ac0b --- /dev/null +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultMetricPass.java @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating; + +import edu.snu.nemo.common.DataSkewMetricFactory; +import edu.snu.nemo.common.HashRange; +import edu.snu.nemo.common.KeyRange; +import edu.snu.nemo.common.dag.DAG; +import edu.snu.nemo.common.ir.edge.IREdge; +import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.DataSkewMetricProperty; +import edu.snu.nemo.common.ir.vertex.IRVertex; +import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; + +import java.util.HashMap; +import java.util.Map; + +/** + * Pass for initiating IREdge Metric ExecutionProperty with default key range. + */ +public final class DefaultMetricPass extends AnnotatingPass { + /** + * Default constructor. + */ + public DefaultMetricPass() { + super(DataSkewMetricProperty.class); + } + + @Override + public DAG apply(final DAG dag) { + dag.topologicalDo(dst -> + dag.getIncomingEdgesOf(dst).forEach(edge -> { + if (CommunicationPatternProperty.Value.Shuffle + .equals(edge.getPropertyValue(CommunicationPatternProperty.class).get())) { + final int parallelism = dst.getPropertyValue(ParallelismProperty.class).get(); + final Map metric = new HashMap<>(); + for (int i = 0; i < parallelism; i++) { + metric.put(i, HashRange.of(i, i + 1, false)); + } + edge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(metric))); + } + })); + return dag; + } +} diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java index e2ac70f..c5bc1d9 100644 --- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java @@ -53,10 +53,16 @@ public final class SkewResourceSkewedDataPass extends AnnotatingPass { .filter(v -> v instanceof MetricCollectionBarrierVertex) .forEach(v -> v.setProperty(DynamicOptimizationProperty .of(DynamicOptimizationProperty.Value.DataSkewRuntimePass))); + dag.getVertices().stream() .filter(v -> hasMetricCollectionBarrierVertexAsParent(dag, v) - && !v.getPropertyValue(ResourceSkewedDataProperty.class).isPresent()) - .forEach(v -> v.setProperty(ResourceSkewedDataProperty.of(true))); + && !v.getExecutionProperties().containsKey(ResourceSkewedDataProperty.class)) + .forEach(childV -> { + childV.getExecutionProperties().put(ResourceSkewedDataProperty.of(true)); + dag.getDescendants(childV.getId()).forEach(descendentV -> { + descendentV.getExecutionProperties().put(ResourceSkewedDataProperty.of(true)); + }); + }); return dag; } diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/DefaultCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/DefaultCompositePass.java index 4dc16d4..6bb119b 100644 --- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/DefaultCompositePass.java +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/DefaultCompositePass.java @@ -31,6 +31,7 @@ public final class DefaultCompositePass extends CompositePass { public DefaultCompositePass() { super(Arrays.asList( new DefaultParallelismPass(), + new DefaultMetricPass(), new DefaultEdgeEncoderPass(), new DefaultEdgeDecoderPass(), new DefaultDataStorePass(), diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java index 9d7eb29..acb8d50 100644 --- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java @@ -44,14 +44,6 @@ public final class DataSkewPolicy implements Policy { this.policy = BUILDER.build(); } - public DataSkewPolicy(final int skewness) { - this.policy = new PolicyBuilder() - .registerRuntimePass(new DataSkewRuntimePass().setNumSkewedKeys(skewness), new SkewCompositePass()) - .registerCompileTimePass(new LoopOptimizationCompositePass()) - .registerCompileTimePass(new DefaultCompositePass()) - .build(); - } - @Override public DAG runCompileTimeOptimization(final DAG dag, final String dagDirectory) throws Exception { diff --git a/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilderTest.java b/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilderTest.java index f75cb8c..79191ba 100644 --- a/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilderTest.java +++ b/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilderTest.java @@ -26,19 +26,19 @@ import static org.junit.Assert.assertTrue; public final class PolicyBuilderTest { @Test public void testDisaggregationPolicy() { - assertEquals(17, DisaggregationPolicy.BUILDER.getCompileTimePasses().size()); + assertEquals(18, DisaggregationPolicy.BUILDER.getCompileTimePasses().size()); assertEquals(0, DisaggregationPolicy.BUILDER.getRuntimePasses().size()); } @Test public void testTransientResourcePolicy() { - assertEquals(19, TransientResourcePolicy.BUILDER.getCompileTimePasses().size()); + assertEquals(20, TransientResourcePolicy.BUILDER.getCompileTimePasses().size()); assertEquals(0, TransientResourcePolicy.BUILDER.getRuntimePasses().size()); } @Test public void testDataSkewPolicy() { - assertEquals(21, DataSkewPolicy.BUILDER.getCompileTimePasses().size()); + assertEquals(22, DataSkewPolicy.BUILDER.getCompileTimePasses().size()); assertEquals(1, DataSkewPolicy.BUILDER.getRuntimePasses().size()); } diff --git a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java index 871bf44..d26adaf 100644 --- a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java +++ b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java @@ -126,7 +126,6 @@ public final class JobConf extends ConfigurationModuleBuilder { } /** -<<<<<<< HEAD * Max number of attempts for task scheduling. */ @NamedParameter(doc = "Max number of task attempts", short_name = "max_task_attempt", default_value = "1") diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java index 2e2e616..049ccbe 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java +++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java @@ -43,9 +43,10 @@ public final class RuntimeIdGenerator { * Generates the ID for physical plan. * * @return the generated ID + * TODO #100: Refactor string-based RuntimeIdGenerator for IR-based DynOpt */ public static String generatePhysicalPlanId() { - return "Plan-" + physicalPlanIdGenerator.getAndIncrement(); + return "Plan-" + physicalPlanIdGenerator.get(); } /** diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java index 50df799..9a8b26a 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java +++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java @@ -19,7 +19,7 @@ package edu.snu.nemo.runtime.common.eventhandler; import edu.snu.nemo.common.eventhandler.RuntimeEvent; -import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex; +import edu.snu.nemo.common.ir.edge.IREdge; import edu.snu.nemo.runtime.common.plan.PhysicalPlan; /** @@ -27,25 +27,27 @@ import edu.snu.nemo.runtime.common.plan.PhysicalPlan; */ public final class DynamicOptimizationEvent implements RuntimeEvent { private final PhysicalPlan physicalPlan; - private final MetricCollectionBarrierVertex metricCollectionBarrierVertex; + private final Object dynOptData; private final String taskId; private final String executorId; + private final IREdge targetEdge; /** * Default constructor. * @param physicalPlan physical plan to be optimized. - * @param metricCollectionBarrierVertex metric collection barrier vertex to retrieve metric data from. * @param taskId id of the task which triggered the dynamic optimization. * @param executorId the id of executor which executes {@code taskId} */ public DynamicOptimizationEvent(final PhysicalPlan physicalPlan, - final MetricCollectionBarrierVertex metricCollectionBarrierVertex, + final Object dynOptData, final String taskId, - final String executorId) { + final String executorId, + final IREdge targetEdge) { this.physicalPlan = physicalPlan; - this.metricCollectionBarrierVertex = metricCollectionBarrierVertex; this.taskId = taskId; + this.dynOptData = dynOptData; this.executorId = executorId; + this.targetEdge = targetEdge; } /** @@ -56,14 +58,7 @@ public final class DynamicOptimizationEvent implements RuntimeEvent { } /** - * @return the metric collection barrier vertex for the dynamic optimization. - */ - public MetricCollectionBarrierVertex getMetricCollectionBarrierVertex() { - return this.metricCollectionBarrierVertex; - } - - /** - * @return id of the task which triggered the dynamic optimization + * @return id of the task which triggered the dynamic optimization. */ public String getTaskId() { return taskId; @@ -75,4 +70,12 @@ public final class DynamicOptimizationEvent implements RuntimeEvent { public String getExecutorId() { return executorId; } + + public Object getDynOptData() { + return this.dynOptData; + } + + public IREdge getTargetEdge() { + return this.targetEdge; + } } diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java index 56ce7c1..a642248 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java +++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java @@ -20,7 +20,7 @@ package edu.snu.nemo.runtime.common.eventhandler; import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper; import edu.snu.nemo.common.eventhandler.RuntimeEventHandler; -import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex; +import edu.snu.nemo.common.ir.edge.IREdge; import edu.snu.nemo.runtime.common.optimizer.RunTimeOptimizer; import edu.snu.nemo.runtime.common.plan.PhysicalPlan; import org.apache.reef.wake.impl.PubSubEventHandler; @@ -50,11 +50,10 @@ public final class DynamicOptimizationEventHandler implements RuntimeEventHandle @Override public void onNext(final DynamicOptimizationEvent dynamicOptimizationEvent) { final PhysicalPlan physicalPlan = dynamicOptimizationEvent.getPhysicalPlan(); - final MetricCollectionBarrierVertex metricCollectionBarrierVertex = - dynamicOptimizationEvent.getMetricCollectionBarrierVertex(); + final Object dynOptData = dynamicOptimizationEvent.getDynOptData(); + final IREdge targetEdge = dynamicOptimizationEvent.getTargetEdge(); - final PhysicalPlan newPlan = RunTimeOptimizer.dynamicOptimization(physicalPlan, - metricCollectionBarrierVertex); + final PhysicalPlan newPlan = RunTimeOptimizer.dynamicOptimization(physicalPlan, dynOptData, targetEdge); pubSubEventHandler.onNext(new UpdatePhysicalPlanEvent(newPlan, dynamicOptimizationEvent.getTaskId(), dynamicOptimizationEvent.getExecutorId())); diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java index e251c98..30ad958 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java +++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java @@ -16,10 +16,17 @@ package edu.snu.nemo.runtime.common.optimizer; import edu.snu.nemo.common.Pair; -import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex; -import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty; +import edu.snu.nemo.common.dag.DAG; +import edu.snu.nemo.common.ir.edge.IREdge; +import edu.snu.nemo.common.ir.vertex.IRVertex; +import edu.snu.nemo.runtime.common.RuntimeIdGenerator; import edu.snu.nemo.runtime.common.optimizer.pass.runtime.DataSkewRuntimePass; import edu.snu.nemo.runtime.common.plan.PhysicalPlan; +import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator; +import edu.snu.nemo.runtime.common.plan.Stage; +import edu.snu.nemo.runtime.common.plan.StageEdge; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; import java.util.*; @@ -36,24 +43,27 @@ public final class RunTimeOptimizer { /** * Dynamic optimization method to process the dag with an appropriate pass, decided by the stats. * @param originalPlan original physical execution plan. - * @param metricCollectionBarrierVertex the vertex that collects metrics and chooses which optimization to perform. * @return the newly updated optimized physical plan. */ public static synchronized PhysicalPlan dynamicOptimization( final PhysicalPlan originalPlan, - final MetricCollectionBarrierVertex metricCollectionBarrierVertex) { - final DynamicOptimizationProperty.Value dynamicOptimizationType = - metricCollectionBarrierVertex.getPropertyValue(DynamicOptimizationProperty.class).get(); + final Object dynOptData, + final IREdge targetEdge) { + try { + final PhysicalPlanGenerator physicalPlanGenerator = + Tang.Factory.getTang().newInjector().getInstance(PhysicalPlanGenerator.class); - switch (dynamicOptimizationType) { - case DataSkewRuntimePass: - // Metric data for DataSkewRuntimePass is a pair of blockIds and map of hashrange, partition size. - final Pair, Map> metricData = - Pair.of(metricCollectionBarrierVertex.getBlockIds(), - (Map) metricCollectionBarrierVertex.getMetricData()); - return new DataSkewRuntimePass().apply(originalPlan, metricData); - default: - throw new UnsupportedOperationException("Unknown runtime pass: " + dynamicOptimizationType); + // Data for dynamic optimization used in DataSkewRuntimePass + // is a map of . + final DAG newIrDAG = + new DataSkewRuntimePass() + .apply(originalPlan.getIrDAG(), Pair.of(targetEdge, (Map) dynOptData)); + final DAG stageDAG = physicalPlanGenerator.apply(newIrDAG); + final PhysicalPlan physicalPlan = + new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), newIrDAG, stageDAG); + return physicalPlan; + } catch (final InjectionException e) { + throw new RuntimeException(e); } } } diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java index 3218317..67afbad 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java +++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java @@ -16,18 +16,18 @@ package edu.snu.nemo.runtime.common.optimizer.pass.runtime; import com.google.common.annotations.VisibleForTesting; +import edu.snu.nemo.common.DataSkewMetricFactory; import edu.snu.nemo.common.Pair; import edu.snu.nemo.common.dag.DAG; -import edu.snu.nemo.common.dag.DAGBuilder; import edu.snu.nemo.common.eventhandler.RuntimeEventHandler; -import edu.snu.nemo.common.exception.DynamicOptimizationException; - -import edu.snu.nemo.runtime.common.RuntimeIdGenerator; -import edu.snu.nemo.runtime.common.data.KeyRange; -import edu.snu.nemo.runtime.common.plan.PhysicalPlan; -import edu.snu.nemo.runtime.common.plan.Stage; -import edu.snu.nemo.runtime.common.plan.StageEdge; -import edu.snu.nemo.runtime.common.data.HashRange; + +import edu.snu.nemo.common.ir.edge.IREdge; +import edu.snu.nemo.common.ir.edge.executionproperty.DataSkewMetricProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty; +import edu.snu.nemo.common.ir.vertex.IRVertex; +import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; +import edu.snu.nemo.common.KeyRange; +import edu.snu.nemo.common.HashRange; import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEventHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,19 +41,24 @@ import java.util.stream.Collectors; * this RuntimePass identifies a number of keys with big partition sizes(skewed key) * and evenly redistributes data via overwriting incoming edges of destination tasks. */ -public final class DataSkewRuntimePass extends RuntimePass, Map>> { +public final class DataSkewRuntimePass extends RuntimePass>> { private static final Logger LOG = LoggerFactory.getLogger(DataSkewRuntimePass.class.getName()); private final Set> eventHandlers; // Skewed keys denote for top n keys in terms of partition size. public static final int DEFAULT_NUM_SKEWED_KEYS = 3; - private int numSkewedKeys = DEFAULT_NUM_SKEWED_KEYS; + private int numSkewedKeys; /** * Constructor. */ public DataSkewRuntimePass() { - this.eventHandlers = Collections.singleton( - DynamicOptimizationEventHandler.class); + this.eventHandlers = Collections.singleton(DynamicOptimizationEventHandler.class); + this.numSkewedKeys = DEFAULT_NUM_SKEWED_KEYS; + } + + public DataSkewRuntimePass(final int numOfSkewedKeys) { + this(); + this.numSkewedKeys = numOfSkewedKeys; } public DataSkewRuntimePass setNumSkewedKeys(final int numOfSkewedKeys) { @@ -67,44 +72,32 @@ public final class DataSkewRuntimePass extends RuntimePass, Ma } @Override - public PhysicalPlan apply(final PhysicalPlan originalPlan, - final Pair, Map> metricData) { - // Builder to create new stages. - final DAGBuilder physicalDAGBuilder = - new DAGBuilder<>(originalPlan.getStageDAG()); - final List blockIds = metricData.left(); - + public DAG apply(final DAG irDAG, + final Pair> metricData) { // get edges to optimize - final List optimizationEdgeIds = blockIds.stream().map(blockId -> - RuntimeIdGenerator.getRuntimeEdgeIdFromBlockId(blockId)).collect(Collectors.toList()); - final DAG stageDAG = originalPlan.getStageDAG(); - final List optimizationEdges = stageDAG.getVertices().stream() - .flatMap(stage -> stageDAG.getIncomingEdgesOf(stage).stream()) - .filter(stageEdge -> optimizationEdgeIds.contains(stageEdge.getId())) + final List optimizationEdges = irDAG.getVertices().stream() + .flatMap(v -> irDAG.getIncomingEdgesOf(v).stream()) + .filter(e -> Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass) + .equals(e.getPropertyValue(MetricCollectionProperty.class))) .collect(Collectors.toList()); + final IREdge targetEdge = metricData.left(); // Get number of evaluators of the next stage (number of blocks). - final Integer numOfDstTasks = optimizationEdges.stream().findFirst().orElseThrow(() -> - new RuntimeException("optimization edges are empty")).getDst().getTaskIds().size(); + final Integer dstParallelism = targetEdge.getDst().getPropertyValue(ParallelismProperty.class).get(); // Calculate keyRanges. - final List keyRanges = calculateKeyRanges(metricData.right(), numOfDstTasks); - + final List keyRanges = calculateKeyRanges(metricData.right(), dstParallelism); + final Map taskIdxToKeyRange = new HashMap<>(); + for (int i = 0; i < dstParallelism; i++) { + taskIdxToKeyRange.put(i, keyRanges.get(i)); + } // Overwrite the previously assigned key range in the physical DAG with the new range. - optimizationEdges.forEach(optimizationEdge -> { - // Update the information. - final Map taskIdxToHashRange = new HashMap<>(); - for (int taskIdx = 0; taskIdx < numOfDstTasks; taskIdx++) { - taskIdxToHashRange.put(taskIdx, keyRanges.get(taskIdx)); - } - optimizationEdge.setTaskIdxToKeyRange(taskIdxToHashRange); - }); - - return new PhysicalPlan(originalPlan.getId(), physicalDAGBuilder.build()); + targetEdge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(taskIdxToKeyRange))); + return irDAG; } public List identifySkewedKeys(final Map keyValToPartitionSizeMap) { - // Identify skewed keyes. + // Identify skewed keys. List> sortedMetricData = keyValToPartitionSizeMap.entrySet().stream() .sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue())) .collect(Collectors.toList()); @@ -134,31 +127,31 @@ public final class DataSkewRuntimePass extends RuntimePass, Ma * to a key range of partitions with approximate size of (total size of partitions / the number of tasks). * * @param keyToPartitionSizeMap a map of key to partition size. - * @param numOfDstTasks the number of tasks that receives this data as input. + * @param dstParallelism the number of tasks that receive this data as input. * @return the list of key ranges calculated. */ @VisibleForTesting public List calculateKeyRanges(final Map keyToPartitionSizeMap, - final Integer numOfDstTasks) { - // Get the biggest key. - final int maxKey = keyToPartitionSizeMap.keySet().stream() + final Integer dstParallelism) { + // Get the last key. + final int lastKey = keyToPartitionSizeMap.keySet().stream() .max(Integer::compareTo) - .orElseThrow(() -> new DynamicOptimizationException("Cannot find max key among blocks.")); + .get(); // Identify skewed keys, which is top numSkewedKeys number of keys. List skewedKeys = identifySkewedKeys(keyToPartitionSizeMap); // Calculate the ideal size for each destination task. final Long totalSize = keyToPartitionSizeMap.values().stream().mapToLong(n -> n).sum(); // get total size - final Long idealSizePerTask = totalSize / numOfDstTasks; // and derive the ideal size per task + final Long idealSizePerTask = totalSize / dstParallelism; // and derive the ideal size per task - final List keyRanges = new ArrayList<>(numOfDstTasks); + final List keyRanges = new ArrayList<>(dstParallelism); int startingKey = 0; int finishingKey = 1; Long currentAccumulatedSize = keyToPartitionSizeMap.getOrDefault(startingKey, 0L); Long prevAccumulatedSize = 0L; - for (int i = 1; i <= numOfDstTasks; i++) { - if (i != numOfDstTasks) { + for (int i = 1; i <= dstParallelism; i++) { + if (i != dstParallelism) { // Ideal accumulated partition size for this task. final Long idealAccumulatedSize = idealSizePerTask * i; // By adding partition sizes, find the accumulated size nearest to the given ideal size. @@ -185,15 +178,15 @@ public final class DataSkewRuntimePass extends RuntimePass, Ma prevAccumulatedSize = currentAccumulatedSize; startingKey = finishingKey; } else { // last one: we put the range of the rest. - boolean isSkewedKey = containsSkewedKey(skewedKeys, startingKey, finishingKey); + boolean isSkewedKey = containsSkewedKey(skewedKeys, startingKey, lastKey + 1); keyRanges.add(i - 1, - HashRange.of(startingKey, maxKey + 1, isSkewedKey)); + HashRange.of(startingKey, lastKey + 1, isSkewedKey)); - while (finishingKey <= maxKey) { + while (finishingKey <= lastKey) { currentAccumulatedSize += keyToPartitionSizeMap.getOrDefault(finishingKey, 0L); finishingKey++; } - LOG.debug("KeyRange {}~{}, Size {}", startingKey, maxKey + 1, + LOG.debug("KeyRange {}~{}, Size {}", startingKey, lastKey + 1, currentAccumulatedSize - prevAccumulatedSize); } } diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java index 249a239..5a8471f 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java +++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java @@ -15,9 +15,11 @@ */ package edu.snu.nemo.runtime.common.optimizer.pass.runtime; +import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.eventhandler.RuntimeEventHandler; +import edu.snu.nemo.common.ir.edge.IREdge; +import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.common.pass.Pass; -import edu.snu.nemo.runtime.common.plan.PhysicalPlan; import java.util.Set; import java.util.function.BiFunction; @@ -28,7 +30,8 @@ import java.util.function.BiFunction; * after dynamic optimization. * @param type of the metric data used for dynamic optimization. */ -public abstract class RuntimePass extends Pass implements BiFunction { +public abstract class RuntimePass extends Pass + implements BiFunction, T, DAG> { /** * @return the set of event handlers used with the runtime pass. */ diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java index 30f7111..4286bca 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java +++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java @@ -16,6 +16,7 @@ package edu.snu.nemo.runtime.common.plan; import edu.snu.nemo.common.dag.DAG; +import edu.snu.nemo.common.ir.edge.IREdge; import edu.snu.nemo.common.ir.vertex.IRVertex; import java.io.Serializable; @@ -27,6 +28,7 @@ import java.util.Map; */ public final class PhysicalPlan implements Serializable { private final String id; + private final DAG irDAG; private final DAG stageDAG; private final Map idToIRVertex; @@ -37,8 +39,10 @@ public final class PhysicalPlan implements Serializable { * @param stageDAG the DAG of stages. */ public PhysicalPlan(final String id, + final DAG irDAG, final DAG stageDAG) { this.id = id; + this.irDAG = irDAG; this.stageDAG = stageDAG; idToIRVertex = new HashMap<>(); @@ -70,6 +74,13 @@ public final class PhysicalPlan implements Serializable { return idToIRVertex; } + /** + * @return IR DAG. + */ + public DAG getIrDAG() { + return irDAG; + } + @Override public String toString() { return stageDAG.toString(); diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java index ca4124b..16f4bc4 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java +++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java @@ -34,6 +34,8 @@ import edu.snu.nemo.common.exception.PhysicalPlanGenerationException; import edu.snu.nemo.runtime.common.RuntimeIdGenerator; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.reef.tang.annotations.Parameter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.inject.Inject; import java.util.*; @@ -45,6 +47,7 @@ import java.util.function.Function; public final class PhysicalPlanGenerator implements Function, DAG> { private final String dagDirectory; private final StagePartitioner stagePartitioner; + private static final Logger LOG = LoggerFactory.getLogger(PhysicalPlanGenerator.class.getName()); /** * Private constructor. @@ -147,7 +150,7 @@ public final class PhysicalPlanGenerator implements Function> stageInternalDAGBuilder = new DAGBuilder<>(); - // Prepare vertexIdtoReadables + // Prepare vertexIdToReadables final List> vertexIdToReadables = new ArrayList<>(stageParallelism); for (int i = 0; i < stageParallelism; i++) { vertexIdToReadables.add(new HashMap<>()); @@ -156,7 +159,7 @@ public final class PhysicalPlanGenerator implements Function readables = sourceVertex.getReadables(stageParallelism); @@ -200,6 +203,12 @@ public final class PhysicalPlanGenerator implements Function { private final IRVertex dstVertex; /** - * The list between the task idx and key range to read. - */ - private Map taskIdxToKeyRange; - - /** * Value for {@link CommunicationPatternProperty}. */ private final CommunicationPatternProperty.Value dataCommunicationPatternValue; @@ -81,11 +71,6 @@ public final class StageEdge extends RuntimeEdge { super(runtimeEdgeId, edgeProperties, srcStage, dstStage, isSideInput); this.srcVertex = srcVertex; this.dstVertex = dstVertex; - // Initialize the key range of each dst task. - this.taskIdxToKeyRange = new HashMap<>(); - for (int taskIdx = 0; taskIdx < dstStage.getTaskIds().size(); taskIdx++) { - taskIdxToKeyRange.put(taskIdx, HashRange.of(taskIdx, taskIdx + 1, false)); - } this.dataCommunicationPatternValue = edgeProperties.get(CommunicationPatternProperty.class) .orElseThrow(() -> new RuntimeException(String.format( "CommunicationPatternProperty not set for %s", runtimeEdgeId))); @@ -125,22 +110,6 @@ public final class StageEdge extends RuntimeEdge { } /** - * @return the list between the task idx and key range to read. - */ - public Map getTaskIdxToKeyRange() { - return taskIdxToKeyRange; - } - - /** - * Sets the task idx to key range list. - * - * @param taskIdxToKeyRange the list to set. - */ - public void setTaskIdxToKeyRange(final Map taskIdxToKeyRange) { - this.taskIdxToKeyRange = taskIdxToKeyRange; - } - - /** * @return {@link CommunicationPatternProperty} value. */ public CommunicationPatternProperty.Value getDataCommunicationPattern() { diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java index abe0a8f..88ef432 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java +++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java @@ -83,7 +83,7 @@ public final class StagePartitioner implements Function, M continue; } // Assign stageId - if (testMergability(edge, irDAG)) { + if (testMergeability(edge, irDAG)) { vertexToStageIdMap.put(connectedIRVertex, stageId); } else { vertexToStageIdMap.put(connectedIRVertex, nextStageIndex.getValue()); @@ -99,7 +99,7 @@ public final class StagePartitioner implements Function, M * @param dag IR DAG which contains {@code edge} * @return {@code true} if and only if the source and the destination vertex of the edge can be merged into one stage. */ - private boolean testMergability(final IREdge edge, final DAG dag) { + private boolean testMergeability(final IREdge edge, final DAG dag) { // If the destination vertex has multiple inEdges, return false if (dag.getIncomingEdgesOf(edge.getDst()).size() > 1) { return false; diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java index 0cfe863..3e95830 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java +++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java @@ -20,9 +20,7 @@ import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap; import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty; import java.io.Serializable; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; /** * A Task is a self-contained executable that can be executed on a machine. diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto index 59a3212..911990e 100644 --- a/runtime/common/src/main/proto/ControlMessage.proto +++ b/runtime/common/src/main/proto/ControlMessage.proto @@ -117,8 +117,6 @@ message BlockStateChangedMsg { message DataSizeMetricMsg { // TODO #96: Modularize DataSkewPolicy to use MetricVertex and BarrierVertex. repeated PartitionSizeEntry partitionSize = 1; - required string blockId = 2; - required string srcIRVertexId = 3; } message PartitionSizeEntry { diff --git a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java index 319b8ba..04fca2b 100644 --- a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java +++ b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java @@ -15,8 +15,8 @@ */ package edu.snu.nemo.runtime.common.optimizer.pass.runtime; -import edu.snu.nemo.runtime.common.data.HashRange; -import edu.snu.nemo.runtime.common.data.KeyRange; +import edu.snu.nemo.common.HashRange; +import edu.snu.nemo.common.KeyRange; import org.junit.Before; import org.junit.Test; diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java index ba0a71c..4cf0b12 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java @@ -26,7 +26,7 @@ import edu.snu.nemo.common.ir.edge.executionproperty.DataPersistenceProperty; import edu.snu.nemo.conf.JobConf; import edu.snu.nemo.runtime.common.comm.ControlMessage; import edu.snu.nemo.runtime.common.comm.ControlMessage.ByteTransferContextDescriptor; -import edu.snu.nemo.runtime.common.data.KeyRange; +import edu.snu.nemo.common.KeyRange; import edu.snu.nemo.runtime.common.RuntimeIdGenerator; import edu.snu.nemo.runtime.common.message.MessageEnvironment; import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap; @@ -296,8 +296,6 @@ public final class BlockManagerWorker { .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID) .setType(ControlMessage.MessageType.DataSizeMetric) .setDataSizeMetricMsg(ControlMessage.DataSizeMetricMsg.newBuilder() - .setBlockId(blockId) - .setSrcIRVertexId(srcIRVertexId) .addAllPartitionSize(partitionSizeEntries) ) .build()); diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java index e0de210..960f2c5 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java @@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.executor.data.block; import edu.snu.nemo.common.exception.BlockFetchException; import edu.snu.nemo.common.exception.BlockWriteException; -import edu.snu.nemo.runtime.common.data.KeyRange; +import edu.snu.nemo.common.KeyRange; import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition; import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition; diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java index 6eef824..be42f6b 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java @@ -18,7 +18,7 @@ package edu.snu.nemo.runtime.executor.data.block; import edu.snu.nemo.common.Pair; import edu.snu.nemo.common.exception.BlockFetchException; import edu.snu.nemo.common.exception.BlockWriteException; -import edu.snu.nemo.runtime.common.data.KeyRange; +import edu.snu.nemo.common.KeyRange; import edu.snu.nemo.runtime.executor.data.*; import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition; import edu.snu.nemo.runtime.executor.data.partition.Partition; diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java index 5bf1e01..7722f2d 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java @@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.executor.data.block; import edu.snu.nemo.common.exception.BlockFetchException; import edu.snu.nemo.common.exception.BlockWriteException; -import edu.snu.nemo.runtime.common.data.KeyRange; +import edu.snu.nemo.common.KeyRange; import edu.snu.nemo.runtime.executor.data.DataUtil; import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition; import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition; diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java index 847558f..03a7ab1 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java @@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.executor.data.block; import edu.snu.nemo.common.exception.BlockFetchException; import edu.snu.nemo.common.exception.BlockWriteException; -import edu.snu.nemo.runtime.common.data.KeyRange; +import edu.snu.nemo.common.KeyRange; import edu.snu.nemo.runtime.executor.data.DataUtil; import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition; import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition; diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java index 4471ae4..9870259 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java @@ -16,6 +16,8 @@ package edu.snu.nemo.runtime.executor.datatransfer; import com.google.common.annotations.VisibleForTesting; +import edu.snu.nemo.common.DataSkewMetricFactory; +import edu.snu.nemo.common.ir.edge.executionproperty.*; import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty; import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty; import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty; @@ -23,16 +25,14 @@ import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyV import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; import edu.snu.nemo.runtime.common.RuntimeIdGenerator; -import edu.snu.nemo.runtime.common.data.KeyRange; +import edu.snu.nemo.common.KeyRange; import edu.snu.nemo.runtime.common.plan.RuntimeEdge; import edu.snu.nemo.runtime.common.plan.StageEdge; import edu.snu.nemo.common.exception.BlockFetchException; import edu.snu.nemo.common.exception.UnsupportedCommPatternException; -import edu.snu.nemo.runtime.common.data.HashRange; +import edu.snu.nemo.common.HashRange; import edu.snu.nemo.runtime.executor.data.BlockManagerWorker; import edu.snu.nemo.runtime.executor.data.DataUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.CompletableFuture; @@ -45,7 +45,6 @@ import java.util.stream.StreamSupport; * Represents the input data transfer to a task. */ public final class InputReader extends DataTransfer { - private static final Logger LOG = LoggerFactory.getLogger(InputReader.class.getName()); private final int dstTaskIndex; private final BlockManagerWorker blockManagerWorker; @@ -118,8 +117,9 @@ public final class InputReader extends DataTransfer { assert (runtimeEdge instanceof StageEdge); final Optional dataStoreProperty = runtimeEdge.getPropertyValue(DataStoreProperty.class); - final KeyRange hashRangeToRead = - ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex); + final DataSkewMetricFactory metricFactory = + (DataSkewMetricFactory) runtimeEdge.getExecutionProperties().get(DataSkewMetricProperty.class).get(); + final KeyRange hashRangeToRead = metricFactory.getMetric().get(dstTaskIndex); if (hashRangeToRead == null) { throw new BlockFetchException( new Throwable("The hash range to read is not assigned to " + dstTaskIndex + "'th task")); diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java index 8f1b7e5..61adee2 100644 --- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java +++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java @@ -20,8 +20,8 @@ import edu.snu.nemo.common.coder.*; import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty; import edu.snu.nemo.conf.JobConf; import edu.snu.nemo.runtime.common.RuntimeIdGenerator; -import edu.snu.nemo.runtime.common.data.HashRange; -import edu.snu.nemo.runtime.common.data.KeyRange; +import edu.snu.nemo.common.HashRange; +import edu.snu.nemo.common.KeyRange; import edu.snu.nemo.runtime.common.message.MessageEnvironment; import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher; import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment; diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java index c05745d..46c6ce9 100644 --- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java +++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java @@ -15,9 +15,9 @@ */ package edu.snu.nemo.runtime.executor.data; +import edu.snu.nemo.common.HashRange; import edu.snu.nemo.common.coder.IntDecoderFactory; import edu.snu.nemo.common.coder.IntEncoderFactory; -import edu.snu.nemo.runtime.common.data.HashRange; import edu.snu.nemo.runtime.executor.data.block.Block; import edu.snu.nemo.runtime.executor.data.block.FileBlock; import edu.snu.nemo.runtime.executor.data.block.NonSerializedMemoryBlock; diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java index c755543..229f30d 100644 --- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java +++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java @@ -15,6 +15,9 @@ */ package edu.snu.nemo.runtime.executor.datatransfer; +import edu.snu.nemo.common.DataSkewMetricFactory; +import edu.snu.nemo.common.HashRange; +import edu.snu.nemo.common.KeyRange; import edu.snu.nemo.common.coder.*; import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper; import edu.snu.nemo.common.ir.edge.IREdge; @@ -300,6 +303,15 @@ public final class DataTransferTest { dummyIREdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep)); dummyIREdge.setProperty(EncoderProperty.of(ENCODER_FACTORY)); dummyIREdge.setProperty(DecoderProperty.of(DECODER_FACTORY)); + if (dummyIREdge.getPropertyValue(CommunicationPatternProperty.class).get() + .equals(CommunicationPatternProperty.Value.Shuffle)) { + final int parallelism = dstVertex.getPropertyValue(ParallelismProperty.class).get(); + final Map metric = new HashMap<>(); + for (int i = 0; i < parallelism; i++) { + metric.put(i, HashRange.of(i, i + 1, false)); + } + dummyIREdge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(metric))); + } final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties(); final RuntimeEdge dummyEdge; @@ -383,6 +395,15 @@ public final class DataTransferTest { = dummyIREdge.getPropertyValue(DuplicateEdgeGroupProperty.class); duplicateDataProperty.get().setRepresentativeEdgeId(edgeId); duplicateDataProperty.get().setGroupSize(2); + if (dummyIREdge.getPropertyValue(CommunicationPatternProperty.class).get() + .equals(CommunicationPatternProperty.Value.Shuffle)) { + final int parallelism = dstVertex.getPropertyValue(ParallelismProperty.class).get(); + final Map metric = new HashMap<>(); + for (int i = 0; i < parallelism; i++) { + metric.put(i, HashRange.of(i, i + 1, false)); + } + dummyIREdge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(metric))); + } dummyIREdge.setProperty(DataStoreProperty.of(store)); dummyIREdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep)); final RuntimeEdge dummyEdge, dummyEdge2; diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DataSkewDynOptDataHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DataSkewDynOptDataHandler.java new file mode 100644 index 0000000..47671ed --- /dev/null +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DataSkewDynOptDataHandler.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.runtime.master; + +import edu.snu.nemo.runtime.common.comm.ControlMessage; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Handler for aggregating data used in data skew dynamic optimization. + */ +public class DataSkewDynOptDataHandler implements DynOptDataHandler { + private final Map aggregatedDynOptData; + + public DataSkewDynOptDataHandler() { + this.aggregatedDynOptData = new HashMap<>(); + } + + /** + * Updates data for dynamic optimization sent from Tasks. + * @param dynOptData data used for data skew dynamic optimization. + */ + @Override + public final void updateDynOptData(final Object dynOptData) { + List partitionSizeInfo + = (List) dynOptData; + partitionSizeInfo.forEach(partitionSizeEntry -> { + final int hashIndex = partitionSizeEntry.getKey(); + final long partitionSize = partitionSizeEntry.getSize(); + if (aggregatedDynOptData.containsKey(hashIndex)) { + aggregatedDynOptData.compute(hashIndex, (originalKey, originalValue) -> originalValue + partitionSize); + } else { + aggregatedDynOptData.put(hashIndex, partitionSize); + } + }); + } + + /** + * Returns aggregated data for dynamic optimization. + * @return aggregated data used for data skew dynamic optimization. + */ + @Override + public final Object getDynOptData() { + return aggregatedDynOptData; + } +} diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DynOptDataHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DynOptDataHandler.java new file mode 100644 index 0000000..66fa310 --- /dev/null +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DynOptDataHandler.java @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.runtime.master; + +/** + * Handler for aggregating data used in dynamic optimization. + */ +public interface DynOptDataHandler { + /** + * Updates data for dynamic optimization sent from Tasks. + * @param dynOptData data used for dynamic optimization. + */ + void updateDynOptData(Object dynOptData); + + /** + * Returns aggregated data for dynamic optimization. + * @return aggregated data used for dynamic optimization. + */ + Object getDynOptData(); +} diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java index 41ad243..d1fa2df 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java @@ -19,13 +19,13 @@ import edu.snu.nemo.common.Pair; import edu.snu.nemo.conf.JobConf; import edu.snu.nemo.common.exception.*; import edu.snu.nemo.common.ir.vertex.IRVertex; -import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex; import edu.snu.nemo.runtime.common.comm.ControlMessage; import edu.snu.nemo.runtime.common.message.MessageContext; import edu.snu.nemo.runtime.common.message.MessageEnvironment; import edu.snu.nemo.runtime.common.message.MessageListener; import edu.snu.nemo.runtime.common.plan.PhysicalPlan; import edu.snu.nemo.runtime.common.state.TaskState; +import edu.snu.nemo.runtime.master.scheduler.BatchScheduler; import edu.snu.nemo.runtime.master.servlet.*; import edu.snu.nemo.runtime.master.resource.ContainerManager; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; @@ -82,8 +82,6 @@ public final class RuntimeMaster { private final MetricMessageHandler metricMessageHandler; private final MessageEnvironment masterMessageEnvironment; private final MetricStore metricStore; - private final Map aggregatedMetricData; - private final ExecutorService metricAggregationService; private final ClientRPC clientRPC; private final MetricManagerMaster metricManagerMaster; // For converting json data. This is a thread safe. @@ -123,8 +121,6 @@ public final class RuntimeMaster { this.irVertices = new HashSet<>(); this.resourceRequestCount = new AtomicInteger(0); this.objectMapper = new ObjectMapper(); - this.aggregatedMetricData = new ConcurrentHashMap<>(); - this.metricAggregationService = Executors.newFixedThreadPool(10); this.metricStore = MetricStore.getStore(); this.metricServer = startRestMetricServer(); } @@ -343,10 +339,8 @@ public final class RuntimeMaster { LOG.error(failedExecutorId + " failed, Stack Trace: ", exception); throw new RuntimeException(exception); case DataSizeMetric: - final ControlMessage.DataSizeMetricMsg dataSizeMetricMsg = message.getDataSizeMetricMsg(); // TODO #96: Modularize DataSkewPolicy to use MetricVertex and BarrierVertex. - accumulateBarrierMetric(dataSizeMetricMsg.getPartitionSizeList(), - dataSizeMetricMsg.getSrcIRVertexId(), dataSizeMetricMsg.getBlockId()); + ((BatchScheduler) scheduler).updateDynOptData(message.getDataSizeMetricMsg().getPartitionSizeList()); break; case MetricMessageReceived: final List metricList = message.getMetricMsg().getMetricList(); @@ -371,45 +365,6 @@ public final class RuntimeMaster { } } - /** - * Accumulates the metric data for a barrier vertex. - * TODO #96: Modularize DataSkewPolicy to use MetricVertex and BarrierVertex. - * TODO #98: Implement MetricVertex that collect metric used for dynamic optimization. - * - * @param partitionSizeInfo the size of partitions in a block to accumulate. - * @param srcVertexId the ID of the source vertex. - * @param blockId the ID of the block. - */ - private void accumulateBarrierMetric(final List partitionSizeInfo, - final String srcVertexId, - final String blockId) { - final IRVertex vertexToSendMetricDataTo = irVertices.stream() - .filter(irVertex -> irVertex.getId().equals(srcVertexId)).findFirst() - .orElseThrow(() -> new RuntimeException(srcVertexId + " doesn't exist in the submitted Physical Plan")); - - if (vertexToSendMetricDataTo instanceof MetricCollectionBarrierVertex) { - final MetricCollectionBarrierVertex metricCollectionBarrierVertex = - (MetricCollectionBarrierVertex) vertexToSendMetricDataTo; - - metricCollectionBarrierVertex.addBlockId(blockId); - metricAggregationService.submit(() -> { - // For each hash range index, we aggregate the metric data. - partitionSizeInfo.forEach(partitionSizeEntry -> { - final int key = partitionSizeEntry.getKey(); - final long size = partitionSizeEntry.getSize(); - if (aggregatedMetricData.containsKey(key)) { - aggregatedMetricData.compute(key, (existKey, existValue) -> existValue + size); - } else { - aggregatedMetricData.put(key, size); - } - }); - metricCollectionBarrierVertex.setMetricData(aggregatedMetricData); - }); - } else { - throw new RuntimeException("Something wrong happened at SkewCompositePass."); - } - } - private static TaskState.State convertTaskState(final ControlMessage.TaskStateFromExecutor state) { switch (state) { case READY: diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java index d0e21d8..d5c2e89 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java @@ -44,7 +44,6 @@ import java.util.stream.Stream; */ @NotThreadSafe public final class ExecutorRepresenter { - private final String executorId; private final ResourceSpecification resourceSpecification; private final Map runningComplyingTasks; @@ -107,7 +106,6 @@ public final class ExecutorRepresenter { ? runningComplyingTasks : runningNonComplyingTasks).put(task.getTaskId(), task); runningTaskToAttempt.put(task, task.getAttemptIdx()); failedTasks.remove(task); - serializationExecutorService.submit(() -> { final byte[] serialized = SerializationUtils.serialize(task); sendControlMessage( diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java index 24c0e43..77881c0 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java @@ -17,18 +17,20 @@ package edu.snu.nemo.runtime.master.scheduler; import com.google.common.collect.Sets; import edu.snu.nemo.common.Pair; -import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper; import edu.snu.nemo.common.ir.Readable; +import edu.snu.nemo.common.ir.edge.IREdge; +import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty; import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.runtime.common.RuntimeIdGenerator; import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEvent; import edu.snu.nemo.runtime.common.plan.*; import edu.snu.nemo.runtime.common.state.BlockState; import edu.snu.nemo.runtime.common.state.TaskState; +import edu.snu.nemo.runtime.master.DataSkewDynOptDataHandler; +import edu.snu.nemo.runtime.master.DynOptDataHandler; import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler; import edu.snu.nemo.common.exception.*; -import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex; import edu.snu.nemo.runtime.common.state.StageState; import edu.snu.nemo.runtime.master.BlockManagerMaster; import edu.snu.nemo.runtime.master.PlanStateManager; @@ -75,6 +77,7 @@ public final class BatchScheduler implements Scheduler { private PhysicalPlan physicalPlan; private PlanStateManager planStateManager; private List> sortedScheduleGroups; + private List dynOptDataHandlers; @Inject private BatchScheduler(final TaskDispatcher taskDispatcher, @@ -93,6 +96,8 @@ public final class BatchScheduler implements Scheduler { .subscribe(updatePhysicalPlanEventHandler.getEventClass(), updatePhysicalPlanEventHandler); } this.executorRegistry = executorRegistry; + this.dynOptDataHandlers = new ArrayList<>(); + dynOptDataHandlers.add(new DataSkewDynOptDataHandler()); } /** @@ -130,14 +135,12 @@ public final class BatchScheduler implements Scheduler { * Handles task state transition notifications sent from executors. * Note that we can receive notifications for previous task attempts, due to the nature of asynchronous events. * We ignore such late-arriving notifications, and only handle notifications for the current task attempt. - * * @param executorId the id of the executor where the message was sent from. * @param taskId whose state has changed * @param taskAttemptIndex of the task whose state has changed * @param newState the state to change to * @param vertexPutOnHold the ID of vertex that is put on hold. It is null otherwise. */ - @Override public void onTaskStateReportFromExecutor(final String executorId, final String taskId, final int taskAttemptIndex, @@ -158,7 +161,7 @@ public final class BatchScheduler implements Scheduler { onTaskExecutionFailedRecoverable(executorId, taskId, failureCause); break; case ON_HOLD: - onTaskExecutionOnHold(executorId, taskId, vertexPutOnHold); + onTaskExecutionOnHold(executorId, taskId); break; case FAILED: throw new UnrecoverableFailureException(new Exception(new StringBuffer().append("The plan failed on Task #") @@ -364,15 +367,34 @@ public final class BatchScheduler implements Scheduler { }); } + public IREdge getEdgeToOptimize(final String taskId) { + // Get a stage including the given task + final Stage stagePutOnHold = physicalPlan.getStageDAG().getVertices().stream() + .filter(stage -> stage.getTaskIds().contains(taskId)).findFirst().get(); + + // Get outgoing edges of that stage with MetricCollectionProperty + List stageEdges = physicalPlan.getStageDAG().getOutgoingEdgesOf(stagePutOnHold); + IREdge targetEdge = null; + for (StageEdge edge : stageEdges) { + final IRVertex srcIRVertex = edge.getSrcIRVertex(); + final IRVertex dstIRVertex = edge.getDstIRVertex(); + targetEdge = physicalPlan.getIrDAG().getEdgeBetween(srcIRVertex.getId(), dstIRVertex.getId()); + if (MetricCollectionProperty.Value.DataSkewRuntimePass + .equals(targetEdge.getPropertyValue(MetricCollectionProperty.class).get())) { + break; + } + } + + return targetEdge; + } + /** * Action for after task execution is put on hold. * @param executorId the ID of the executor. * @param taskId the ID of the task. - * @param vertexPutOnHold the ID of vertex that is put on hold. */ private void onTaskExecutionOnHold(final String executorId, - final String taskId, - final String vertexPutOnHold) { + final String taskId) { LOG.info("{} put on hold in {}", new Object[]{taskId, executorId}); executorRegistry.updateExecutor(executorId, (executor, state) -> { executor.onTaskExecutionComplete(taskId); @@ -383,21 +405,18 @@ public final class BatchScheduler implements Scheduler { final boolean stageComplete = planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE); + final IREdge targetEdge = getEdgeToOptimize(taskId); + if (targetEdge == null) { + throw new RuntimeException("No edges specified for data skew optimization"); + } + if (stageComplete) { - // get optimization vertex from the task. - final MetricCollectionBarrierVertex metricCollectionBarrierVertex = - getVertexDagById(taskId).getVertices().stream() // get vertex list - .filter(irVertex -> irVertex.getId().equals(vertexPutOnHold)) // find it - .filter(irVertex -> irVertex instanceof MetricCollectionBarrierVertex) - .distinct() - .map(irVertex -> (MetricCollectionBarrierVertex) irVertex) // convert types - .findFirst().orElseThrow(() -> new RuntimeException(TaskState.State.ON_HOLD.name() // get it - + " called with failed task ids by some other task than " - + MetricCollectionBarrierVertex.class.getSimpleName())); - // and we will use this vertex to perform metric collection and dynamic optimization. - - pubSubEventHandlerWrapper.getPubSubEventHandler().onNext( - new DynamicOptimizationEvent(physicalPlan, metricCollectionBarrierVertex, taskId, executorId)); + final DynOptDataHandler dynOptDataHandler = dynOptDataHandlers.stream() + .filter(dataHandler -> dataHandler instanceof DataSkewDynOptDataHandler) + .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!")); + pubSubEventHandlerWrapper.getPubSubEventHandler() + .onNext(new DynamicOptimizationEvent(physicalPlan, dynOptDataHandler.getDynOptData(), + taskId, executorId, targetEdge)); } } @@ -478,16 +497,10 @@ public final class BatchScheduler implements Scheduler { .collect(Collectors.toSet()); } - /** - * @param taskId id of the task - * @return the IR dag - */ - private DAG> getVertexDagById(final String taskId) { - for (final Stage stage : physicalPlan.getStageDAG().getVertices()) { - if (stage.getId().equals(RuntimeIdGenerator.getStageIdFromTaskId(taskId))) { - return stage.getIRDAG(); - } - } - throw new RuntimeException("This taskId does not exist in the plan"); + public void updateDynOptData(final Object dynOptData) { + final DynOptDataHandler dynOptDataHandler = dynOptDataHandlers.stream() + .filter(dataHandler -> dataHandler instanceof DataSkewDynOptDataHandler) + .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!")); + dynOptDataHandler.updateDynOptData(dynOptData); } } diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java index ff3986e..a2c0935 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java @@ -51,7 +51,7 @@ public final class NodeShareSchedulingConstraint implements SchedulingConstraint @Override public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) { final Map propertyValue = task.getPropertyValue(ResourceSiteProperty.class) - .orElseThrow(() -> new RuntimeException("ResourceSiteProperty expected")); + .orElseThrow(() -> new RuntimeException("ResourceSiteProperty expected")); if (propertyValue.isEmpty()) { return true; } diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java index 4a774bf..50dd8d5 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java @@ -45,8 +45,8 @@ public final class SchedulingConstraintRegistry { registerSchedulingConstraint(containerTypeAwareSchedulingConstraint); registerSchedulingConstraint(freeSlotSchedulingConstraint); registerSchedulingConstraint(sourceLocationAwareSchedulingConstraint); - registerSchedulingConstraint(nodeShareSchedulingConstraint); registerSchedulingConstraint(skewnessAwareSchedulingConstraint); + registerSchedulingConstraint(nodeShareSchedulingConstraint); } /** diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java index 236453f..6e5ba4c 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java @@ -16,11 +16,13 @@ package edu.snu.nemo.runtime.master.scheduler; import com.google.common.annotations.VisibleForTesting; +import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.DataSkewMetricProperty; import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty; import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceSkewedDataProperty; import edu.snu.nemo.runtime.common.RuntimeIdGenerator; -import edu.snu.nemo.runtime.common.data.HashRange; -import edu.snu.nemo.runtime.common.data.KeyRange; +import edu.snu.nemo.common.HashRange; +import edu.snu.nemo.common.KeyRange; import edu.snu.nemo.runtime.common.plan.StageEdge; import edu.snu.nemo.runtime.common.plan.Task; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; @@ -28,6 +30,7 @@ import org.apache.reef.annotations.audience.DriverSide; import javax.annotation.concurrent.ThreadSafe; import javax.inject.Inject; +import java.util.Map; /** * This policy aims to distribute partitions with skewed keys to different executors. @@ -36,7 +39,6 @@ import javax.inject.Inject; @DriverSide @AssociatedProperty(ResourceSkewedDataProperty.class) public final class SkewnessAwareSchedulingConstraint implements SchedulingConstraint { - @VisibleForTesting @Inject public SkewnessAwareSchedulingConstraint() { @@ -45,9 +47,14 @@ public final class SkewnessAwareSchedulingConstraint implements SchedulingConstr public boolean hasSkewedData(final Task task) { final int taskIdx = RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId()); for (StageEdge inEdge : task.getTaskIncomingEdges()) { - final KeyRange hashRange = inEdge.getTaskIdxToKeyRange().get(taskIdx); - if (((HashRange) hashRange).isSkewed()) { - return true; + if (CommunicationPatternProperty.Value.Shuffle + .equals(inEdge.getDataCommunicationPattern())) { + final Map taskIdxToKeyRange = + inEdge.getPropertyValue(DataSkewMetricProperty.class).get().getMetric(); + final KeyRange hashRange = taskIdxToKeyRange.get(taskIdx); + if (((HashRange) hashRange).isSkewed()) { + return true; + } } } return false; diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java index 6c0222c..e6feaa5 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java @@ -131,7 +131,6 @@ final class TaskDispatcher { planStateManager.onTaskStateChanged(task.getTaskId(), TaskState.State.EXECUTING); LOG.info("{} scheduled to {}", task.getTaskId(), selectedExecutor.getExecutorId()); - // send the task selectedExecutor.onTaskScheduled(task); } else { diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java index 87c933f..e3439d6 100644 --- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java +++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java @@ -15,9 +15,16 @@ */ package edu.snu.nemo.runtime.master.scheduler; +import edu.snu.nemo.common.DataSkewMetricFactory; +import edu.snu.nemo.common.ir.edge.IREdge; +import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.DataSkewMetricProperty; +import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.runtime.common.RuntimeIdGenerator; -import edu.snu.nemo.runtime.common.data.HashRange; -import edu.snu.nemo.runtime.common.data.KeyRange; +import edu.snu.nemo.common.HashRange; +import edu.snu.nemo.common.KeyRange; +import edu.snu.nemo.runtime.common.plan.Stage; import edu.snu.nemo.runtime.common.plan.StageEdge; import edu.snu.nemo.runtime.common.plan.Task; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; @@ -36,27 +43,36 @@ import static org.mockito.Mockito.when; * Test cases for {@link SkewnessAwareSchedulingConstraint}. */ @RunWith(PowerMockRunner.class) -@PrepareForTest({ExecutorRepresenter.class, Task.class, HashRange.class, StageEdge.class}) +@PrepareForTest({ExecutorRepresenter.class, Task.class, Stage.class, HashRange.class, +IRVertex.class, IREdge.class}) public final class SkewnessAwareSchedulingConstraintTest { - private static StageEdge mockStageEdge() { + private static StageEdge mockStageEdge(final int numSkewedHashRange, + final int numTotalHashRange) { final Map taskIdxToKeyRange = new HashMap<>(); - final HashRange skewedHashRange1 = mock(HashRange.class); - when(skewedHashRange1.isSkewed()).thenReturn(true); - final HashRange skewedHashRange2 = mock(HashRange.class); - when(skewedHashRange2.isSkewed()).thenReturn(true); - final HashRange hashRange = mock(HashRange.class); - when(hashRange.isSkewed()).thenReturn(false); + for (int taskIdx = 0; taskIdx < numTotalHashRange; taskIdx++) { + final HashRange hashRange = mock(HashRange.class); + if (taskIdx < numSkewedHashRange) { + when(hashRange.isSkewed()).thenReturn(true); + } else { + when(hashRange.isSkewed()).thenReturn(false); + } + taskIdxToKeyRange.put(taskIdx, hashRange); + } - taskIdxToKeyRange.put(0, skewedHashRange1); - taskIdxToKeyRange.put(1, skewedHashRange2); - taskIdxToKeyRange.put(2, hashRange); + final IRVertex srcMockVertex = mock(IRVertex.class); + final IRVertex dstMockVertex = mock(IRVertex.class); + final Stage srcMockStage = mock(Stage.class); + final Stage dstMockStage = mock(Stage.class); - final StageEdge inEdge = mock(StageEdge.class); - when(inEdge.getTaskIdxToKeyRange()).thenReturn(taskIdxToKeyRange); + final IREdge dummyIREdge = new IREdge(CommunicationPatternProperty.Value.Shuffle, srcMockVertex, dstMockVertex); + dummyIREdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull)); + dummyIREdge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(taskIdxToKeyRange))); + final StageEdge dummyEdge = new StageEdge("Edge-0", dummyIREdge.getExecutionProperties(), + srcMockVertex, dstMockVertex, srcMockStage, dstMockStage, false); - return inEdge; + return dummyEdge; } private static Task mockTask(final int taskIdx, final List inEdges) { @@ -81,11 +97,12 @@ public final class SkewnessAwareSchedulingConstraintTest { @Test public void testScheduleSkewedTasks() { final SchedulingConstraint schedulingConstraint = new SkewnessAwareSchedulingConstraint(); - final StageEdge inEdge = mockStageEdge(); - final Task task0 = mockTask(0, Arrays.asList(inEdge)); - final Task task1 = mockTask(1, Arrays.asList(inEdge)); - final Task task2 = mockTask(2, Arrays.asList(inEdge)); - final ExecutorRepresenter e0 = mockExecutorRepresenter(task0); + // Create a StageEdge where two out of three are skewed hash ranges. + final StageEdge inEdge = mockStageEdge(2, 3); + final Task task0 = mockTask(0, Arrays.asList(inEdge)); // skewed task + final Task task1 = mockTask(1, Arrays.asList(inEdge)); // skewed task + final Task task2 = mockTask(2, Arrays.asList(inEdge)); // non-skewed task + final ExecutorRepresenter e0 = mockExecutorRepresenter(task0); // schedule skewed task to e0 assertEquals(true, schedulingConstraint.testSchedulability(e0, task2)); assertEquals(false, schedulingConstraint.testSchedulability(e0, task1)); diff --git a/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java b/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java index 3b3646e..3c23dcf 100644 --- a/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java +++ b/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java @@ -95,7 +95,7 @@ public final class TestPlanGenerator { final Policy policy) throws Exception { final DAG optimized = policy.runCompileTimeOptimization(irDAG, EMPTY_DAG_DIRECTORY); final DAG physicalDAG = PLAN_GENERATOR.apply(optimized); - return new PhysicalPlan("TestPlan", physicalDAG); + return new PhysicalPlan("TestPlan", irDAG, physicalDAG); } /**