nemo-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] johnyangk commented on a change in pull request #193: [NEMO-338] SkewSamplingPass
Date Wed, 20 Feb 2019 07:27:16 GMT
johnyangk commented on a change in pull request #193: [NEMO-338] SkewSamplingPass
URL: https://github.com/apache/incubator-nemo/pull/193#discussion_r258359366
 
 

 ##########
 File path: compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SamplingSkewReshapingPass.java
 ##########
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping;
+
+import org.apache.nemo.common.KeyExtractor;
+import org.apache.nemo.common.dag.Edge;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.*;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.utility.MessageAggregatorVertex;
+import org.apache.nemo.common.ir.vertex.utility.MessageBarrierVertex;
+import org.apache.nemo.common.ir.vertex.utility.SamplingVertex;
+import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * Optimizes the PartitionSet property of shuffle edges to handle data skews using the SamplingVertex.
+ *
+ * This pass effectively partitions the IRDAG by non-oneToOne edges, clones each subDAG partition
using SamplingVertex
+ * to process sampled data, and executes each cloned partition prior to executing the corresponding
original partition.
+ *
+ * Suppose the IRDAG is partitioned into two sub-DAGs as follows:
+ * P1 - P2
+ *
+ * Then, this pass will produce something like:
+ * P1' - P1 - P2
+ *          - P2' - P2
+ * where Px' consists of SamplingVertex objects that clone the execution of Px.
+ *
+ * For each Px' this pass also inserts a MessageBarrierVertex, to use its data statistics
for dynamically optimizing
+ * the execution behaviors of Px.
+ */
+@Requires(CommunicationPatternProperty.class)
+public final class SamplingSkewReshapingPass extends ReshapingPass {
+  private static final Logger LOG = LoggerFactory.getLogger(SamplingSkewReshapingPass.class.getName());
+  private static final float SAMPLE_RATE = 0.1f;
+
+  /**
+   * Default constructor.
+   */
+  public SamplingSkewReshapingPass() {
+    super(SamplingSkewReshapingPass.class);
+  }
+
+  @Override
+  public IRDAG apply(final IRDAG dag) {
+    dag.topologicalDo(v -> {
+      for (final IREdge e : dag.getIncomingEdgesOf(v)) {
+        if (CommunicationPatternProperty.Value.Shuffle.equals(
+          e.getPropertyValue(CommunicationPatternProperty.class).get())) {
+          // Compute the partition and its source vertices
+          final IRVertex shuffleWriter = e.getSrc();
+          final Set<IRVertex> partitionAll = recursivelyBuildPartition(shuffleWriter,
dag);
+          final Set<IRVertex> partitionSources = partitionAll.stream().filter(vertexInPartition
->
+            !dag.getIncomingEdgesOf(vertexInPartition).stream()
+              .map(Edge::getSrc)
+              .anyMatch(partitionAll::contains)
+          ).collect(Collectors.toSet());
+
+          // Insert sampling vertices.
+          final Set<SamplingVertex> samplingVertices = partitionAll
+            .stream()
+            .map(vertexInPartition -> new SamplingVertex(vertexInPartition, SAMPLE_RATE))
+            .collect(Collectors.toSet());
+          dag.insert(samplingVertices, partitionSources);
+
+          // Insert the message vertex.
+          // We first obtain a clonedShuffleEdge to analyze the data statistics of the shuffle
outputs of
+          // the sampling vertex right before shuffle.
+          final SamplingVertex rightBeforeShuffle = samplingVertices.stream()
+            .filter(sv -> sv.getOriginalVertex().equals(e.getSrc()))
+            .findFirst()
+            .orElseThrow(() -> new IllegalStateException());
+          final IREdge clonedShuffleEdge = rightBeforeShuffle.getCloneOfOriginalEdge(e);
+
+          final KeyExtractor keyExtractor = e.getPropertyValue(KeyExtractorProperty.class).get();
 
 Review comment:
   Thanks for bringing this up!
   
   (1) Regarding pipelining MessageBarrierVertex within a single stage with parent sampling
vertices:
   
   I've changed the semantics of insert() to use SamplingVertex(NewVertex) instead of the
NewVertex, if an existing vertex that the NewVertex will connect to is a SamplingVertex. I
think this is a reasonable assumption as new vertices that consume outputs from sampling vertices
will process a subset of data anyways, and no such new vertex will  reach the original DAG
except via control edges. With this change Nemo is able to pipeline the MessageBarrierVertex
(wrapped inside a SamplingVertex), avoiding duplicate data materialization.
   
   (2) Regarding connecting the message aggregation vertex to the partition Sources:
   
   I'd prefer not to do this, at least considering current use cases we have. 
   
   Here's the physical DAG diagram of PerKeyMedianITCase#testLargeShuffleSamplingSkew  (including
the fix for (1))
   https://nemo.snuspl.snu.ac.kr:50443/nemo-dag-out/7a1c136ac24f427ebb4c34a43712da3f.svg
   
   In the diagram the ScheduleGroup property is set such that the sampling partition does
always execute prior to the original partition. In particular the ordering ScheduleGroup0(Stage3+Stage5)
==> ScheduleGroup1(Stage4+Stage6) is enforced although Stage4 ==> Stage6 is PUSH (which
makes sense when considering each schedule group as a big vertex). The sampling should also
happen prior to the execution of the original partition when Stage4 ==> Stage6 is PULL
as well, although the schedule groups may differ in this case. I think this shows that a sequence
of insert(samplingVertex) and insert(messageVertex) captures our intention fairly well.
   
   I did write some code to try to 'extend' the control edges from (sampling vertices) to
(existing vertices), by adding new control edges from (new vertices that connect to sampling
vertices) to (existing vertices) upon each insert(). However, I ultimately I felt that this
approach complicates the code quite a bit, and reverted the code back to the current approach
which I think works for the current use cases.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message