nemo-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sanha commented on a change in pull request #193: [NEMO-338] SkewSamplingPass
Date Tue, 19 Feb 2019 10:29:22 GMT
sanha commented on a change in pull request #193: [NEMO-338] SkewSamplingPass
URL: https://github.com/apache/incubator-nemo/pull/193#discussion_r257972283
 
 

 ##########
 File path: common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
 ##########
 @@ -151,73 +176,153 @@ public void insert(final StreamVertex streamVertex, final IREdge edgeToStreamize
    *        shuffleEdge - messageAggregatorVertex - broadcastEdge - dst
    * (the "Before" relationships are unmodified)
    *
+   * This preserves semantics as the results of the inserted message vertices are never consumed
by the original IRDAG.
+   *
    * @param messageBarrierVertex to insert.
    * @param messageAggregatorVertex to insert.
    * @param mbvOutputEncoder to use.
    * @param mbvOutputDecoder to use.
    * @param edgesToGetStatisticsOf to examine.
+   * @param edgesToOptimize to optimize.
    */
   public void insert(final MessageBarrierVertex messageBarrierVertex,
                      final MessageAggregatorVertex messageAggregatorVertex,
                      final EncoderProperty mbvOutputEncoder,
                      final DecoderProperty mbvOutputDecoder,
-                     final Set<IREdge> edgesToGetStatisticsOf) {
-    if (edgesToGetStatisticsOf.stream().map(edge -> edge.getDst().getId()).collect(Collectors.toSet()).size()
!= 1) {
-      throw new IllegalArgumentException("Not destined to the same vertex: " + edgesToGetStatisticsOf.toString());
-    }
-    final IRVertex dst = edgesToGetStatisticsOf.iterator().next().getDst();
-
+                     final Set<IREdge> edgesToGetStatisticsOf,
+                     final Set<IREdge> edgesToOptimize) {
     // Create a completely new DAG with the vertex inserted.
-    final DAGBuilder builder = new DAGBuilder();
+    final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
 
-    // Current metric collection id.
-    final int currentMetricCollectionId = metricCollectionId.incrementAndGet();
+    // All of the existing vertices and edges remain intact
+    modifiedDAG.topologicalDo(v -> {
+      builder.addVertex(v);
+      modifiedDAG.getIncomingEdgesOf(v).forEach(builder::connectVertices);
+    });
 
-    // First, add all the vertices.
-    modifiedDAG.topologicalDo(v -> builder.addVertex(v));
+    ////////////////////////////////// STEP 1: Insert new vertices and edges
 
+    // From mav to dst
     // Add a control dependency (no output) from the messageAggregatorVertex to the destination.
     builder.addVertex(messageAggregatorVertex);
-    final IREdge noDataEdge = new IREdge(CommunicationPatternProperty.Value.BroadCast, messageAggregatorVertex,
dst);
-    builder.connectVertices(noDataEdge);
+    final IRVertex dst = edgesToGetStatisticsOf.iterator().next().getDst();
+    builder.connectVertices(Util.createControlEdge(messageAggregatorVertex, dst));
+
+    // Build the edges: src - mbv - mav
+    for (final IREdge edge : edgesToGetStatisticsOf) {
+      final MessageBarrierVertex mbv = new MessageBarrierVertex<>(messageBarrierVertex.getMessageFunction());
+      builder.addVertex(mbv);
+
+      // From src to mbv
+      final IREdge clone = Util.cloneEdge(CommunicationPatternProperty.Value.OneToOne, edge,
edge.getSrc(), mbv);
+      builder.connectVertices(clone);
+
+      // From mbv to mav
+      final IREdge edgeToABV = edgeBetweenMessageVertices(
+        mbv, messageAggregatorVertex, mbvOutputEncoder, mbvOutputDecoder);
+      builder.connectVertices(edgeToABV);
+    }
 
-    // Add the edges and the messageBarrierVertex.
+    ////////////////////////////////// STEP 2: Annotate the MessageId on optimization target
edges
+
+    if (edgesToOptimize.stream().map(edge -> edge.getDst().getId()).collect(Collectors.toSet()).size()
!= 1) {
+      throw new IllegalArgumentException("Not destined to the same vertex: " + edgesToOptimize.toString());
+    }
     modifiedDAG.topologicalDo(v -> {
-      for (final IREdge edge : modifiedDAG.getIncomingEdgesOf(v)) {
-        if (edgesToGetStatisticsOf.contains(edge)) {
-          // MATCH!
-          final MessageBarrierVertex mbv = new MessageBarrierVertex<>(messageBarrierVertex.getMessageFunction());
-          builder.addVertex(mbv);
-
-          // Clone the edgeToGetStatisticsOf
-          final IREdge clone = new IREdge(CommunicationPatternProperty.Value.OneToOne, edge.getSrc(),
mbv);
-          clone.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get()));
-          clone.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get()));
-          edge.getPropertyValue(AdditionalOutputTagProperty.class).ifPresent(tag -> {
-            clone.setProperty(AdditionalOutputTagProperty.of(tag));
-          });
-          builder.connectVertices(clone);
-
-          // messageBarrierVertex to the messageAggregatorVertex
-          final IREdge edgeToABV = edgeBetweenMessageVertices(mbv,
-            messageAggregatorVertex, mbvOutputEncoder, mbvOutputDecoder, currentMetricCollectionId);
-          builder.connectVertices(edgeToABV);
-
-          // The original edge
-          // We then insert the vertex with MessageBarrierTransform and vertex with MessageAggregatorTransform
-          // between the vertex and incoming vertices.
-          final IREdge edgeToOriginalDst =
-            new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(), edge.getSrc(),
v);
-          edge.copyExecutionPropertiesTo(edgeToOriginalDst);
-          edgeToOriginalDst.setPropertyPermanently(MessageIdProperty.of(currentMetricCollectionId));
-          builder.connectVertices(edgeToOriginalDst);
-        } else {
-          // NO MATCH, so simply connect vertices as before.
-          builder.connectVertices(edge);
+      modifiedDAG.getIncomingEdgesOf(v).forEach(inEdge -> {
+        if (edgesToOptimize.contains(inEdge)) {
+          inEdge.setPropertyPermanently(MessageIdProperty.of(messageAggregatorVertex.getMessageId()));
         }
-      }
+      });
+    });
+
+    modifiedDAG = builder.build(); // update the DAG.
+  }
+
+  /**
+   * Inserts a set of samplingVertices that process sampled data.
+   *
+   * This method automatically inserts the following three types of edges.
+   * (1) Edges between samplingVertices to reflect the original relationship
+   * (2) Edges from the original IRDAG to samplingVertices that clone the inEdges of the
original vertices
+   * (3) Edges from the samplingVertices to the original IRDAG to respect executeAfterSamplingVertices
+   *
+   * Suppose the caller supplies the following arguments to perform a "sampled run" of vertices
{V1, V2},
+   * prior to executing them.
+   * - samplingVertices: {V1', V2'}
+   * - childrenOfSamplingVertices: {V1}
+   *
+   * Before: V1 - oneToOneEdge - V2 - shuffleEdge - V3
+   * After: V1' - oneToOneEdge - V2' - controlEdge - V1 - oneToOneEdge - V2 - shuffleEdge
- V3
+   *
+   * This preserves semantics as the original IRDAG remains unchanged and unaffected.
+   *
+   * @param samplingVertices to insert.
+   * @param executeAfterSamplingVertices that must be executed after samplingVertices.
+   */
+  public void insert(final Set<SamplingVertex> samplingVertices,
 
 Review comment:
   Why don't we get the `Set` of the original vertices to sample instead of already sampled
vertices?
   If we receive the sampled vertices, opt pass builders can give some already connected sampling
vertices.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message