nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taegeo...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-224] Simple StreamingScheduler (#126)
Date Tue, 23 Oct 2018 23:51:40 GMT
This is an automated email from the ASF dual-hosted git repository.

taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new d2b016e  [NEMO-224] Simple StreamingScheduler (#126)
d2b016e is described below

commit d2b016ea875fcf07a64a1942dd20b037d930d34b
Author: John Yang <johnyangk@gmail.com>
AuthorDate: Wed Oct 24 08:51:36 2018 +0900

    [NEMO-224] Simple StreamingScheduler (#126)
    
    JIRA: [NEMO-224: Simple StreamingScheduler](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-224)
    
    **Major changes:**
    - StreamingScheduler with the following properties:
    - Keeps track of new executors
    - Schedules all tasks in a reverse topological order.
    - Crashes the system upon any other events (should be fixed in the future)
    - Never stops running.
    
    **Minor changes to note:**
    - N/A
    
    **Tests for the changes:**
    - StreamingSchedulerTest
    
    **Other comments:**
    - N/A
    
    Closes #126
---
 .../master/scheduler/StreamingScheduler.java       | 151 +++++++++++++++++++++
 .../master/scheduler/BatchSchedulerTest.java       |   6 +-
 .../master/scheduler/StreamingSchedulerTest.java   |  83 +++++++++++
 3 files changed, 236 insertions(+), 4 deletions(-)

diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/StreamingScheduler.java
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/StreamingScheduler.java
new file mode 100644
index 0000000..d098905
--- /dev/null
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/StreamingScheduler.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.runtime.master.scheduler;
+
+import com.google.common.collect.Lists;
+import org.apache.nemo.common.exception.UnknownExecutionStateException;
+import org.apache.nemo.common.ir.Readable;
+import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.common.plan.PhysicalPlan;
+import org.apache.nemo.runtime.common.plan.Stage;
+import org.apache.nemo.runtime.common.plan.StageEdge;
+import org.apache.nemo.runtime.common.plan.Task;
+import org.apache.nemo.runtime.common.state.TaskState;
+import org.apache.nemo.runtime.master.*;
+import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A simple scheduler for streaming workloads.
+ * - Keeps track of new executors
+ * - Schedules all tasks in a reverse topological order.
+ * - Crashes the system upon any other events (should be fixed in the future)
+ * - Never stops running.
+ */
+@DriverSide
+@NotThreadSafe
+public final class StreamingScheduler implements Scheduler {
+  private static final Logger LOG = LoggerFactory.getLogger(StreamingScheduler.class.getName());
+
+  private final TaskDispatcher taskDispatcher;
+  private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
+  private final ExecutorRegistry executorRegistry;
+  private final PlanStateManager planStateManager;
+
+  StreamingScheduler(final TaskDispatcher taskDispatcher,
+                     final PendingTaskCollectionPointer pendingTaskCollectionPointer,
+                     final ExecutorRegistry executorRegistry,
+                     final PlanStateManager planStateManager) {
+    this.taskDispatcher = taskDispatcher;
+    this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
+    this.executorRegistry = executorRegistry;
+    this.planStateManager = planStateManager;
+  }
+
+  @Override
+  public void schedulePlan(final PhysicalPlan submittedPhysicalPlan,
+                           final int maxScheduleAttempt) {
+    // Housekeeping stuff
+    taskDispatcher.run();
+    planStateManager.updatePlan(submittedPhysicalPlan, maxScheduleAttempt);
+    planStateManager.storeJSON("submitted");
+
+    // Prepare tasks
+    final List<Stage> reverseTopoStages = Lists.reverse(submittedPhysicalPlan.getStageDAG().getTopologicalSort());
+    final List<Task> reverseTopoTasks = reverseTopoStages.stream().flatMap(stageToSchedule
-> {
+      // Helper variables for this stage
+      final List<StageEdge> stageIncomingEdges =
+        submittedPhysicalPlan.getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
+      final List<StageEdge> stageOutgoingEdges =
+        submittedPhysicalPlan.getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
+      final List<Map<String, Readable>> vertexIdToReadables = stageToSchedule.getVertexIdToReadables();
+      final List<String> taskIdsToSchedule = planStateManager.getTaskAttemptsToSchedule(stageToSchedule.getId());
+
+      // Create tasks of this stage
+      return taskIdsToSchedule.stream().map(taskId -> new Task(
+        submittedPhysicalPlan.getPlanId(),
+        taskId,
+        stageToSchedule.getExecutionProperties(),
+        stageToSchedule.getSerializedIRDAG(),
+        stageIncomingEdges,
+        stageOutgoingEdges,
+        vertexIdToReadables.get(RuntimeIdManager.getIndexFromTaskId(taskId))));
+    }).collect(Collectors.toList());
+
+    // Schedule everything at once
+    pendingTaskCollectionPointer.setToOverwrite(reverseTopoTasks);
+  }
+
+  @Override
+  public void updatePlan(final PhysicalPlan newPhysicalPlan) {
+    // TODO #227: StreamingScheduler Dynamic Optimization
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void onTaskStateReportFromExecutor(final String executorId,
+                                            final String taskId,
+                                            final int taskAttemptIndex,
+                                            final TaskState.State newState,
+                                            @Nullable final String vertexPutOnHold,
+                                            final TaskState.RecoverableTaskFailureCause failureCause)
{
+    switch (newState) {
+      case COMPLETE:
+      case SHOULD_RETRY:
+      case ON_HOLD:
+      case FAILED:
+        // TODO #226: StreamingScheduler Fault Tolerance
+        throw new UnsupportedOperationException();
+      case READY:
+      case EXECUTING:
+        throw new RuntimeException("The states READY/EXECUTING cannot occur at this point");
+      default:
+        throw new UnknownExecutionStateException(new Exception("This TaskState is unknown:
" + newState));
+    }
+  }
+
+  @Override
+  public void onSpeculativeExecutionCheck() {
+    // TODO #228: StreamingScheduler Speculative Execution
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
+    LOG.info("{} added (node: {})", executorRepresenter.getExecutorId(), executorRepresenter.getNodeName());
+    executorRegistry.registerExecutor(executorRepresenter);
+  }
+
+  @Override
+  public void onExecutorRemoved(final String executorId) {
+    // TODO #226: StreamingScheduler Fault Tolerance
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void terminate() {
+    this.taskDispatcher.terminate();
+    this.executorRegistry.terminate();
+  }
+}
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
index 3021afa..b4591f4 100644
--- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
@@ -27,7 +27,6 @@ import org.apache.nemo.runtime.master.PlanStateManager;
 import org.apache.nemo.runtime.master.MetricMessageHandler;
 import org.apache.nemo.runtime.master.BlockManagerMaster;
 import org.apache.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
-import org.apache.nemo.runtime.master.resource.ContainerManager;
 import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.nemo.runtime.master.resource.ResourceSpecification;
 import org.apache.nemo.common.dag.DAG;
@@ -57,11 +56,10 @@ import static org.mockito.Mockito.mock;
  * Tests {@link BatchScheduler}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ContainerManager.class, BlockManagerMaster.class,
-    PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class})
+@PrepareForTest({BlockManagerMaster.class, PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class})
 public final class BatchSchedulerTest {
   private static final Logger LOG = LoggerFactory.getLogger(BatchSchedulerTest.class.getName());
-  private Scheduler scheduler;
+  private BatchScheduler scheduler;
   private PlanStateManager planStateManager;
   private ExecutorRegistry executorRegistry;
   private final MessageSender<ControlMessage.Message> mockMsgSender = mock(MessageSender.class);
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/StreamingSchedulerTest.java
b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/StreamingSchedulerTest.java
new file mode 100644
index 0000000..276af97
--- /dev/null
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/StreamingSchedulerTest.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.runtime.master.scheduler;
+
+import org.apache.nemo.common.eventhandler.PubSubEventHandlerWrapper;
+import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.common.plan.PhysicalPlan;
+import org.apache.nemo.runtime.common.plan.TestPlanGenerator;
+import org.apache.nemo.runtime.master.BlockManagerMaster;
+import org.apache.nemo.runtime.master.PlanStateManager;
+import org.apache.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests {@link StreamingScheduler}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({BlockManagerMaster.class, PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class,
+  TaskDispatcher.class, PendingTaskCollectionPointer.class, ExecutorRegistry.class, PlanStateManager.class})
+public final class StreamingSchedulerTest {
+  private static final int ATTEMPTS_PER_STAGE = 2;
+
+  private StreamingScheduler scheduler;
+  private PendingTaskCollectionPointer pendingTaskCollectionPointer;
+
+  @Before
+  public void setUp() throws Exception {
+    final TaskDispatcher taskDispatcher = mock(TaskDispatcher.class);
+    final PendingTaskCollectionPointer pendingTaskCollectionPointer = mock(PendingTaskCollectionPointer.class);
+    this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
+    final ExecutorRegistry executorRegistry = mock(ExecutorRegistry.class);
+    final PlanStateManager planStateManager = mock(PlanStateManager.class);
+
+    when(planStateManager.getTaskAttemptsToSchedule(any())).thenAnswer(invocationOnMock ->
{
+      final String stageId = invocationOnMock.getArgument(0);
+      return generateAttempts(stageId);
+    });
+
+    scheduler = new StreamingScheduler(taskDispatcher, pendingTaskCollectionPointer, executorRegistry,
planStateManager);
+  }
+
+  private List<String> generateAttempts(final String stageId) {
+    return IntStream.range(0, ATTEMPTS_PER_STAGE)
+      .mapToObj(taskIndex -> RuntimeIdManager.generateTaskId(stageId, taskIndex, 0))
+      .collect(Collectors.toList());
+  }
+
+  @Test(timeout=10000)
+  public void testScheduleEverything() throws Exception {
+    final PhysicalPlan physicalPlan =
+      TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
false);
+    final int numOfTotalTasks = physicalPlan.getStageDAG().getVertices().size() * ATTEMPTS_PER_STAGE;
+    scheduler.schedulePlan(physicalPlan, 1);
+    verify(pendingTaskCollectionPointer).setToOverwrite(argThat(tasks -> tasks.size()
== numOfTotalTasks));
+  }
+}


Mime
View raw message