nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sa...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-166] Do not attach NodeNamesProperty to Source Stages (#82)
Date Mon, 23 Jul 2018 07:16:30 GMT
This is an automated email from the ASF dual-hosted git repository.

sanha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 2597079  [NEMO-166] Do not attach NodeNamesProperty to Source Stages (#82)
2597079 is described below

commit 2597079df71c682bc6e431ff8034e829fa57a6cf
Author: Jangho Seo <jangho@jangho.io>
AuthorDate: Mon Jul 23 16:16:28 2018 +0900

    [NEMO-166] Do not attach NodeNamesProperty to Source Stages (#82)
    
    JIRA: [NEMO-166: Do not attach NodeNamesProperty to Source Stages](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-166)
    
    **Major changes:**
    - Attaching NodeNamesProperty to source stages makes scheduling source tasks virtually
impossible, if the assigned node does not have the corresponding source split. This PR prevents
NodeNameAssignmentPass not to assign node names on source stages.
    
    **Minor changes to note:**
    - N/A
    
    **Tests for the changes:**
    - N/A
    
    **Other comments:**
    - Integration tests on distributed environment is desired.
    
    resolves [NEMO-166](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-166)
---
 .../annotating/NodeNamesAssignmentPass.java        | 31 +++++++++++++---------
 .../scheduler/NodeShareSchedulingConstraint.java   | 10 ++++---
 2 files changed, 26 insertions(+), 15 deletions(-)

diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java
index 8c6d8a1..f86d355 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java
@@ -81,6 +81,16 @@ public final class NodeNamesAssignmentPass extends AnnotatingPass {
     bandwidthSpecificationString = value;
   }
 
+  private static HashMap<String, Integer> getEvenShares(final List<String> nodes,
final int parallelism) {
+    final HashMap<String, Integer> shares = new HashMap<>();
+    final int defaultShare = parallelism / nodes.size();
+    final int remainder = parallelism % nodes.size();
+    for (int i = 0; i < nodes.size(); i++) {
+      shares.put(nodes.get(i), defaultShare + (i < remainder ? 1 : 0));
+    }
+    return shares;
+  }
+
   private static void assignNodeShares(
       final DAG<IRVertex, IREdge> dag,
       final BandwidthSpecification bandwidthSpecification) {
@@ -89,26 +99,23 @@ public final class NodeNamesAssignmentPass extends AnnotatingPass {
       final int parallelism = irVertex.getPropertyValue(ParallelismProperty.class)
           .orElseThrow(() -> new RuntimeException("Parallelism property required"));
       if (inEdges.size() == 0) {
-        // The stage is root stage.
+        // This vertex is root vertex.
         // Fall back to setting even distribution
-        final HashMap<String, Integer> shares = new HashMap<>();
-        final List<String> nodes = bandwidthSpecification.getNodes();
-        final int defaultShare = parallelism / nodes.size();
-        final int remainder = parallelism % nodes.size();
-        for (int i = 0; i < nodes.size(); i++) {
-          shares.put(nodes.get(i), defaultShare + (i < remainder ? 1 : 0));
-        }
-        irVertex.getExecutionProperties().put(NodeNamesProperty.of(shares));
+        irVertex.getExecutionProperties().put(NodeNamesProperty.of(EMPTY_MAP));
       } else if (isOneToOneEdge(inEdges)) {
-        final Optional<NodeNamesProperty> property = dag.getIncomingEdgesOf(irVertex).iterator().next()
+        final Optional<HashMap<String, Integer>> property = inEdges.iterator().next().getSrc()
             .getExecutionProperties().get(NodeNamesProperty.class);
-        irVertex.getExecutionProperties().put(property.get());
+        irVertex.getExecutionProperties().put(NodeNamesProperty.of(property.get()));
       } else {
         // This IRVertex has shuffle inEdge(s), or has multiple inEdges.
         final Map<String, Integer> parentLocationShares = new HashMap<>();
         for (final IREdge edgeToIRVertex : dag.getIncomingEdgesOf(irVertex)) {
           final IRVertex parentVertex = edgeToIRVertex.getSrc();
-          final Map<String, Integer> shares = parentVertex.getPropertyValue(NodeNamesProperty.class).get();
+          final Map<String, Integer> parentShares = parentVertex.getPropertyValue(NodeNamesProperty.class).get();
+          final int parentParallelism = parentVertex.getPropertyValue(ParallelismProperty.class)
+              .orElseThrow(() -> new RuntimeException("Parallelism property required"));
+          final Map<String, Integer> shares = parentShares.isEmpty() ? getEvenShares(bandwidthSpecification.getNodes(),
+              parentParallelism) : parentShares;
           for (final Map.Entry<String, Integer> element : shares.entrySet()) {
             parentLocationShares.putIfAbsent(element.getKey(), 0);
             parentLocationShares.put(element.getKey(),
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
index 584cd67..6fcbc93 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
@@ -39,7 +39,7 @@ public final class NodeShareSchedulingConstraint implements SchedulingConstraint
     Collections.sort(nodeNames, Comparator.naturalOrder());
     int index = taskIndex;
     for (final String nodeName : nodeNames) {
-      if (index < propertyValue.get(nodeName)) {
+      if (index >= propertyValue.get(nodeName)) {
         index -= propertyValue.get(nodeName);
       } else {
         return nodeName;
@@ -55,7 +55,11 @@ public final class NodeShareSchedulingConstraint implements SchedulingConstraint
     if (propertyValue.isEmpty()) {
       return true;
     }
-    return executor.getNodeName().equals(
-        getNodeName(propertyValue, RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId())));
+    try {
+      return executor.getNodeName().equals(
+          getNodeName(propertyValue, RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId())));
+    } catch (final IllegalStateException e) {
+      throw new RuntimeException(String.format("Cannot schedule %s", task.getTaskId(), e));
+    }
   }
 }


Mime
View raw message