nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeongy...@apache.org
Subject [incubator-nemo] 07/40: merge update
Date Fri, 06 Apr 2018 02:35:47 GMT
This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch skew_exp
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git

commit ce1967606b689893ac44f46418d28c3d23cae316
Merge: 2825151 7c169a6
Author: sanha <sanhaleehana@naver.com>
AuthorDate: Thu Mar 8 14:18:17 2018 +0900

    merge update

 .travis.yml                                        |   7 +-
 README.md                                          |  11 +-
 .../main/java/edu/snu/nemo/common/ir/Readable.java |  10 +-
 .../DuplicateEdgeGroupPropertyValue.java           |   1 +
 .../common/ir/vertex/InitializedSourceVertex.java  |   7 +-
 .../java/edu/snu/nemo/common/test/ArgBuilder.java  |  19 +-
 .../edu/snu/nemo/common/test/ExampleTestUtil.java  |   5 +
 .../spark/source/SparkBoundedSourceVertex.java     |   5 +
 .../optimizer/examples/EmptyComponents.java        |   4 +
 .../beam/AlternatingLeastSquareITCase.java         |  25 +-
 .../snu/nemo/examples/beam/BroadcastITCase.java    |  17 +-
 .../snu/nemo/examples/beam/MapReduceITCase.java    |  22 +-
 .../beam/MultinomialLogisticRegressionITCase.java  |  39 +--
 examples/resources/sample_executor_resources.json  |   6 +-
 .../common/message/ncs/NcsMessageEnvironment.java  |   1 -
 runtime/common/src/main/proto/ControlMessage.proto |   3 +-
 .../edu/snu/nemo/driver/UserApplicationRunner.java |  17 +-
 .../runtime/executor/data/LimitedInputStream.java  |   1 -
 .../executor/data/streamchainer/Serializer.java    |   2 +-
 .../nemo/runtime/master/BlockManagerMaster.java    |  53 +++-
 .../edu/snu/nemo/runtime/master/RuntimeMaster.java | 175 ++++--------
 .../runtime/master/resource/ContainerManager.java  | 172 ++++--------
 .../master/resource/ExecutorRepresenter.java       |  60 ++++
 .../master/scheduler/BatchSingleJobScheduler.java  | 311 +++++++++++----------
 .../{resource => scheduler}/ExecutorRegistry.java  | 102 +++----
 .../scheduler/RoundRobinSchedulingPolicy.java      | 159 ++++++-----
 .../nemo/runtime/master/scheduler/Scheduler.java   |   9 +-
 .../runtime/master/scheduler/SchedulerRunner.java  |  13 +-
 .../runtime/master/scheduler/SchedulingPolicy.java |  14 +-
 .../SourceLocationAwareSchedulingPolicy.java       | 166 +++++++----
 .../snu/nemo/tests/compiler/CompilerTestUtil.java  |   2 +-
 .../snu/nemo/tests/runtime/RuntimeTestUtil.java    | 151 ++--------
 .../runtime/executor/TaskGroupExecutorTest.java    |   4 +
 .../executor/datatransfer/DataTransferTest.java    |   6 +-
 .../tests/runtime/master/ContainerManagerTest.java |  17 +-
 .../scheduler/BatchSingleJobSchedulerTest.java     | 138 ++++-----
 .../master/scheduler/FaultToleranceTest.java       |  17 +-
 .../scheduler/RoundRobinSchedulingPolicyTest.java  |  22 +-
 .../SourceLocationAwareSchedulingPolicyTest.java   |  43 +--
 39 files changed, 902 insertions(+), 934 deletions(-)

diff --cc runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index d77f7de,c76e64a..aa30396
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@@ -71,9 -85,12 +87,13 @@@ public final class ExecutorRepresenter 
    public void onExecutorFailed() {
      runningTaskGroups.forEach(taskGroupId -> failedTaskGroups.add(taskGroupId));
      runningTaskGroups.clear();
 +    smallTaskGroups.clear();
    }
  
+   /**
+    * Marks the TaskGroup as running, and sends scheduling message to the executor.
+    * @param scheduledTaskGroup
+    */
    public void onTaskGroupScheduled(final ScheduledTaskGroup scheduledTaskGroup) {
      runningTaskGroups.add(scheduledTaskGroup.getTaskGroupId());
      failedTaskGroups.remove(scheduledTaskGroup.getTaskGroupId());
@@@ -103,22 -121,27 +127,33 @@@
      messageSender.send(message);
    }
  
+   /**
+    * Marks the specified TaskGroup as completed.
+    * @param taskGroupId id of the TaskGroup
+    */
    public void onTaskGroupExecutionComplete(final String taskGroupId) {
      runningTaskGroups.remove(taskGroupId);
 +    smallTaskGroups.remove(taskGroupId);
      completeTaskGroups.add(taskGroupId);
    }
  
+   /**
+    * Marks the specified TaskGroup as failed.
+    * @param taskGroupId id of the TaskGroup
+    */
    public void onTaskGroupExecutionFailed(final String taskGroupId) {
      runningTaskGroups.remove(taskGroupId);
 +    smallTaskGroups.remove(taskGroupId);
      failedTaskGroups.add(taskGroupId);
    }
  
 +  public Set<String> getSmallTaskGroups() {
 +    return smallTaskGroups;
 +  }
 +
+   /**
+    * @return how many TaskGroups can this executor simultaneously run
+    */
    public int getExecutorCapacity() {
      return resourceSpecification.getCapacity();
    }
diff --cc runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
index 46d6ec0,efcf671..36fdc69
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
@@@ -26,7 -30,8 +30,8 @@@ import java.util.Set
   * Defines the policy by which {@link BatchSingleJobScheduler} assigns task groups to executors.
   */
  @DriverSide
+ @ThreadSafe
 -@DefaultImplementation(RoundRobinSchedulingPolicy.class)
 +@DefaultImplementation(SourceLocationAwareSchedulingPolicy.class)
  public interface SchedulingPolicy {
  
    /**
diff --cc runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
index da8f5f7,de59057..bf92c87
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
@@@ -149,18 -202,22 +202,23 @@@ public final class SourceLocationAwareS
     * @return list of executors, which resides in one of {@code nodeNames}, has container
type of {@code containerType},
     *         and has an empty slot for execution
     */
-   private synchronized List<ExecutorRepresenter> selectExecutorByContainerTypeAndNodeNames(
+   private List<ExecutorRepresenter> selectExecutorByContainerTypeAndNodeNames(
        final String containerType, final Set<String> nodeNames) {
-     final Stream<ExecutorRepresenter> localNodesWithSpareCapacity = executorRegistry.getRunningExecutorIds().stream()
-         .map(executorId -> executorRegistry.getRunningExecutorRepresenter(executorId))
-         .filter(executor -> executor.getRunningTaskGroups().size() - executor.getSmallTaskGroups().size()
-             < executor.getExecutorCapacity())
-         .filter(executor -> nodeNames.contains(executor.getNodeName()));
-     if (containerType.equals(ExecutorPlacementProperty.NONE)) {
-       return localNodesWithSpareCapacity.collect(Collectors.toList());
-     } else {
-       return localNodesWithSpareCapacity.filter(executor -> executor.getContainerType().equals(containerType))
-           .collect(Collectors.toList());
+     lock.lock();
+     try {
+       final Stream<ExecutorRepresenter> localNodesWithSpareCapacity = executorRegistry.getRunningExecutorIds().stream()
+           .map(executorId -> executorRegistry.getRunningExecutorRepresenter(executorId))
 -          .filter(executor -> executor.getRunningTaskGroups().size() < executor.getExecutorCapacity())
++          .filter(executor -> executor.getRunningTaskGroups().size() - executor.getSmallTaskGroups().size()
++              < executor.getExecutorCapacity())
+           .filter(executor -> nodeNames.contains(executor.getNodeName()));
+       if (containerType.equals(ExecutorPlacementProperty.NONE)) {
+         return localNodesWithSpareCapacity.collect(Collectors.toList());
+       } else {
+         return localNodesWithSpareCapacity.filter(executor -> executor.getContainerType().equals(containerType))
+             .collect(Collectors.toList());
+       }
+     } finally {
+       lock.unlock();
      }
    }
  

-- 
To stop receiving notification emails like this one, please contact
jeongyoon@apache.org.

Mime
View raw message