nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jan...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-55] Handle NCS Master-to-Executor RPC failures (#71)
Date Thu, 12 Jul 2018 04:52:46 GMT
This is an automated email from the ASF dual-hosted git repository.

jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 6804b98  [NEMO-55] Handle NCS Master-to-Executor RPC failures (#71)
6804b98 is described below

commit 6804b98be19c18f819c7b26355631df365d4533c
Author: John Yang <johnyangk@gmail.com>
AuthorDate: Thu Jul 12 13:52:44 2018 +0900

    [NEMO-55] Handle NCS Master-to-Executor RPC failures (#71)
    
    JIRA: [NEMO-55: Handle NCS Master-to-Executor RPC failures](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-55)
    
    **Major changes:**
    - Ignores NCS RPC failures assuming that executor failures will be handled by the FailedEvaluator event
    - Introduces the concept of 'poisoned' resources for integration tests
    - Improves the scheduling logic in the master, and exception handling logic in the data plane to pass the added integration test
    
    **Minor changes to note:**
    - Reorders some methods to group similar methods together
    - Pretty logs, more helpful comments
    
    **Tests for the changes:**
    - AlternatingLeastSquareITCase#testPadoWithPoison : Fails the TRANSIENT resource every 1-3 seconds. On my mac the resource is failed and reacquired around 3~6 times before the job completes and the test passes.
    
    **Other comments:**
    - https://issues.apache.org/jira/browse/NEMO-140 is filed for more general handling of RPCs
    - Will file issues soon for refactoring the data plane, and making it more easy to see how exceptions are handled
    
    resolves [NEMO-55](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-55)
---
 .../main/java/edu/snu/nemo/client/JobLauncher.java |   2 +-
 .../src/main/java/edu/snu/nemo/common/dag/DAG.java |   2 +-
 .../java/edu/snu/nemo/common/test/ArgBuilder.java  |   5 +
 conf/src/main/java/edu/snu/nemo/conf/JobConf.java  |  55 ++---
 .../beam/AlternatingLeastSquareITCase.java         |  24 ++-
 .../beam/policy/PadoPolicyParallelismTen.java      |  44 ++++
 .../beam_sample_poisoned_executor_resources.json   |  13 ++
 .../common/message/FailedMessageSender.java        |  42 ++++
 .../common/message/ncs/NcsMessageContext.java      |   4 +-
 .../common/message/ncs/NcsMessageEnvironment.java  |  30 +--
 .../snu/nemo/runtime/common/metric/TaskMetric.java |   2 +-
 .../snu/nemo/runtime/common/plan/StageEdge.java    |   5 +
 .../edu/snu/nemo/runtime/common/plan/Task.java     |   7 +-
 .../snu/nemo/runtime/common/state/TaskState.java   |   2 +
 .../main/java/edu/snu/nemo/driver/NemoContext.java |  33 ++-
 .../main/java/edu/snu/nemo/driver/NemoDriver.java  |   4 +-
 .../edu/snu/nemo/driver/UserApplicationRunner.java |   6 +-
 .../edu/snu/nemo/runtime/executor/Executor.java    |   5 +-
 .../nemo/runtime/executor/MetricManagerWorker.java |   4 +-
 .../executor/bytetransfer/ByteTransferContext.java |   6 +-
 .../executor/bytetransfer/ByteTransport.java       |   8 +-
 .../bytetransfer/ClosableBlockingQueue.java        |  16 +-
 .../runtime/executor/data/BlockManagerWorker.java  | 235 ++++++++++++---------
 ...ctionQueue.java => BlockTransferThrottler.java} |  13 +-
 .../runtime/executor/datatransfer/InputReader.java |   9 +-
 .../nemo/runtime/executor/task/TaskExecutor.java   |   4 +-
 ...ueTest.java => BlockTransferThrottlerTest.java} |  25 ++-
 .../nemo/runtime/master/BlockManagerMaster.java    |   4 +-
 .../edu/snu/nemo/runtime/master/BlockMetadata.java |  10 +-
 .../snu/nemo/runtime/master/JobStateManager.java   |   4 +-
 .../edu/snu/nemo/runtime/master/RuntimeMaster.java |  14 +-
 .../runtime/master/resource/ContainerManager.java  |  22 +-
 .../master/resource/ExecutorRepresenter.java       |  18 --
 .../master/resource/ResourceSpecification.java     |  65 ++----
 .../master/scheduler/BatchSingleJobScheduler.java  |  46 ++--
 .../runtime/master/scheduler/SchedulerRunner.java  |   6 +-
 .../nemo/runtime/master/ContainerManagerTest.java  |  10 +-
 .../{TaskRestartTest.java => TaskRetryTest.java}   |   4 +-
 38 files changed, 487 insertions(+), 321 deletions(-)

diff --git a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
index 6826174..0b80b83 100644
--- a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
+++ b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
@@ -245,7 +245,7 @@ public final class JobLauncher {
     cl.registerShortNameOfClass(JobConf.BandwidthJSONPath.class);
     cl.registerShortNameOfClass(JobConf.JVMHeapSlack.class);
     cl.registerShortNameOfClass(JobConf.IORequestHandleThreadsTotal.class);
-    cl.registerShortNameOfClass(JobConf.MaxScheduleAttempt.class);
+    cl.registerShortNameOfClass(JobConf.MaxTaskAttempt.class);
     cl.registerShortNameOfClass(JobConf.FileDirectory.class);
     cl.registerShortNameOfClass(JobConf.GlusterVolumeDirectory.class);
     cl.registerShortNameOfClass(JobConf.PartitionTransportServerPort.class);
diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAG.java b/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
index 46198f0..aeaea6f 100644
--- a/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
+++ b/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
@@ -411,7 +411,7 @@ public final class DAG<V extends Vertex, E extends Edge<V>> implements Serializa
     try (final PrintWriter printWriter = new PrintWriter(file)) {
       printWriter.println(toString());
       printWriter.close();
-      LOG.info(String.format("DAG JSON for %s is saved at %s"
+      LOG.debug(String.format("DAG JSON for %s is saved at %s"
           + " (Use https://service.jangho.kr/nemo-dag/ to visualize it.)", description, file.getPath()));
     } catch (IOException e) {
       LOG.warn(String.format("Cannot store JSON representation of %s to %s: %s",
diff --git a/common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java b/common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java
index aeb27a8..9b11b56 100644
--- a/common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java
+++ b/common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java
@@ -71,6 +71,11 @@ public final class ArgBuilder {
     return this;
   }
 
+  public ArgBuilder addMaxTaskAttempt(final int maxAttempt) {
+    args.add(Arrays.asList("-max_task_attempt", String.valueOf(maxAttempt)));
+    return this;
+  }
+
   /**
    * @param directory directory to save the DAG.
    * @return builder with the DAG directory.
diff --git a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
index bed8099..8d016af 100644
--- a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
+++ b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
@@ -99,7 +99,7 @@ public final class JobConf extends ConfigurationModuleBuilder {
   public final class OptimizationPolicy implements Name<String> {
   }
 
-  //////////////////////////////// Runtime Configurations
+  //////////////////////////////// Runtime Master-Executor Common Configurations
 
   /**
    * Deploy mode.
@@ -109,6 +109,16 @@ public final class JobConf extends ConfigurationModuleBuilder {
   }
 
   /**
+   * The fraction of container memory not to use fo the JVM heap.
+   */
+  @NamedParameter(doc = "The fraction of the container memory not to use for the JVM heap", short_name = "heap_slack",
+      default_value = "0.3")
+  public final class JVMHeapSlack implements Name<Double> {
+  }
+
+  //////////////////////////////// Runtime Master Configurations
+
+  /**
    * Nemo driver memory.
    */
   @NamedParameter(doc = "Nemo driver memory", short_name = "driver_mem_mb", default_value = "1024")
@@ -116,6 +126,24 @@ public final class JobConf extends ConfigurationModuleBuilder {
   }
 
   /**
+<<<<<<< HEAD
+   * Max number of attempts for task scheduling.
+   */
+  @NamedParameter(doc = "Max number of task attempts", short_name = "max_task_attempt", default_value = "1")
+  public final class MaxTaskAttempt implements Name<Integer> {
+  }
+
+  //////////////////////////////// Runtime Executor Configurations
+
+  /**
+   * Used for fault-injected tests.
+   */
+  @NamedParameter(doc = "Executor crashes after expected time, does not crash when -1",
+      short_name = "executor_poison_sec", default_value = "-1")
+  public final class ExecutorPosionSec implements Name<Integer> {
+  }
+
+  /**
    * Path to the JSON file that specifies bandwidth between locations.
    */
   @NamedParameter(doc = "Path to the JSON file that specifies bandwidth between locations",
@@ -138,13 +166,6 @@ public final class JobConf extends ConfigurationModuleBuilder {
   public final class BandwidthJSONContents implements Name<String> {
   }
 
-  /**
-   * The fraction of container memory not to use fo the JVM heap.
-   */
-  @NamedParameter(doc = "The fraction of the container memory not to use for the JVM heap", short_name = "heap_slack",
-      default_value = "0.3")
-  public final class JVMHeapSlack implements Name<Double> {
-  }
 
   /**
    * Contents of the JSON file that specifies resource layout.
@@ -153,16 +174,7 @@ public final class JobConf extends ConfigurationModuleBuilder {
   public final class ExecutorJSONContents implements Name<String> {
   }
 
-  /**
-   * Executor capacity.
-   * Determines the number of Task 'slots' for each executor.
-   * 1) Master's Task scheduler can use this number in scheduling.
-   *    (e.g., schedule Task to the executor currently with the maximum number of available slots)
-   * 2) Executor's number of Task execution threads is set to this number.
-   */
-  @NamedParameter(doc = "Executor capacity", short_name = "executor_capacity", default_value = "1")
-  public final class ExecutorCapacity implements Name<Integer> {
-  }
+  //////////////////////////////// Runtime Data Plane Configurations
 
   /**
    * Number of I/O threads for block fetch requests from other executor.
@@ -181,13 +193,6 @@ public final class JobConf extends ConfigurationModuleBuilder {
   }
 
   /**
-   * Max number of attempts for task scheduling.
-   */
-  @NamedParameter(doc = "Max number of schedules", short_name = "max_schedule_attempt", default_value = "3")
-  public final class MaxScheduleAttempt implements Name<Integer> {
-  }
-
-  /**
    * The number of serialization threads for scheduling.
    */
   @NamedParameter(doc = "Number of serialization thread for scheduling", short_name = "schedule_ser_thread",
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
index e4e71ae..a5b1f39 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
@@ -18,8 +18,9 @@ package edu.snu.nemo.examples.beam;
 import edu.snu.nemo.client.JobLauncher;
 import edu.snu.nemo.common.test.ArgBuilder;
 import edu.snu.nemo.common.test.ExampleTestUtil;
-import edu.snu.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
+import edu.snu.nemo.compiler.optimizer.policy.DefaultPolicy;
 import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelismFive;
+import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelismTen;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -33,7 +34,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
 public final class AlternatingLeastSquareITCase {
-  private static final int TIMEOUT = 240000;
+  private static final int TIMEOUT = 240 * 1000;
   private static ArgBuilder builder;
   private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
 
@@ -41,7 +42,8 @@ public final class AlternatingLeastSquareITCase {
   private static final String outputFileName = "sample_output_als";
   private static final String output = fileBasePath + outputFileName;
   private static final String testResourceFileName = "test_output_als";
-  private static final String executorResourceFileName = fileBasePath + "beam_sample_executor_resources.json";
+  private static final String noPoisonResources = fileBasePath + "beam_sample_executor_resources.json";
+  private static final String poisonedResource = fileBasePath + "beam_sample_poisoned_executor_resources.json";
   private static final String numFeatures = "10";
   private static final String numIteration = "3";
   private static final String lambda = "0.05";
@@ -49,7 +51,6 @@ public final class AlternatingLeastSquareITCase {
   @Before
   public void setUp() throws Exception {
     builder = new ArgBuilder()
-        .addResourceJson(executorResourceFileName)
         .addUserMain(AlternatingLeastSquare.class.getCanonicalName())
         .addUserArgs(input, numFeatures, numIteration, lambda, output);
   }
@@ -64,18 +65,21 @@ public final class AlternatingLeastSquareITCase {
   }
 
   @Test (timeout = TIMEOUT)
-  public void test() throws Exception {
+  public void testDefault() throws Exception {
     JobLauncher.main(builder
-        .addJobId(AlternatingLeastSquareITCase.class.getSimpleName())
-        .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+        .addResourceJson(noPoisonResources)
+        .addJobId(AlternatingLeastSquareITCase.class.getSimpleName() + "_default")
+        .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
         .build());
   }
 
   @Test (timeout = TIMEOUT)
-  public void testPado() throws Exception {
+  public void testPadoWithPoison() throws Exception {
     JobLauncher.main(builder
-        .addJobId(AlternatingLeastSquareITCase.class.getSimpleName() + "_pado")
-        .addOptimizationPolicy(PadoPolicyParallelismFive.class.getCanonicalName())
+        .addResourceJson(poisonedResource)
+        .addJobId(AlternatingLeastSquareITCase.class.getSimpleName() + "_pado_poisoned")
+        .addMaxTaskAttempt(Integer.MAX_VALUE)
+        .addOptimizationPolicy(PadoPolicyParallelismTen.class.getCanonicalName())
         .build());
   }
 }
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/PadoPolicyParallelismTen.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/PadoPolicyParallelismTen.java
new file mode 100644
index 0000000..53e82cc
--- /dev/null
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/PadoPolicyParallelismTen.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.examples.beam.policy;
+
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
+import edu.snu.nemo.compiler.optimizer.policy.PadoPolicy;
+import edu.snu.nemo.compiler.optimizer.policy.Policy;
+import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
+
+import java.util.List;
+
+/**
+ * A pado policy with fixed parallelism 10 for tests.
+ */
+public final class PadoPolicyParallelismTen implements Policy {
+  private final Policy policy;
+
+  public PadoPolicyParallelismTen() {
+    this.policy = PolicyTestUtil.overwriteParallelism(10, PadoPolicy.class.getCanonicalName());
+  }
+
+  @Override
+  public List<CompileTimePass> getCompileTimePasses() {
+    return this.policy.getCompileTimePasses();
+  }
+
+  @Override
+  public List<RuntimePass<?>> getRuntimePasses() {
+    return this.policy.getRuntimePasses();
+  }
+}
diff --git a/examples/resources/beam_sample_poisoned_executor_resources.json b/examples/resources/beam_sample_poisoned_executor_resources.json
new file mode 100644
index 0000000..b7614a9
--- /dev/null
+++ b/examples/resources/beam_sample_poisoned_executor_resources.json
@@ -0,0 +1,13 @@
+[
+  {
+    "type": "Transient",
+    "memory_mb": 512,
+    "capacity": 15,
+    "poison_sec": 2
+  },
+  {
+    "type": "Reserved",
+    "memory_mb": 512,
+    "capacity": 15
+  }
+]
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/FailedMessageSender.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/FailedMessageSender.java
new file mode 100644
index 0000000..1218b1e
--- /dev/null
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/FailedMessageSender.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.message;
+
+import edu.snu.nemo.runtime.common.comm.ControlMessage;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A message sender that failed.
+ */
+public final class FailedMessageSender implements MessageSender<ControlMessage.Message> {
+  @Override
+  public void send(final ControlMessage.Message message) {
+    // Do nothing.
+  }
+
+  @Override
+  public CompletableFuture<ControlMessage.Message> request(final ControlMessage.Message message) {
+    final CompletableFuture<ControlMessage.Message> failed = new CompletableFuture<>();
+    failed.completeExceptionally(new Throwable("Failed Message Sender"));
+    return failed;
+  }
+
+  @Override
+  public void close() throws Exception {
+    // Do nothing.
+  }
+}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageContext.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageContext.java
index ea478b7..beefcef 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageContext.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageContext.java
@@ -56,7 +56,9 @@ final class NcsMessageContext implements MessageContext {
       // We do not call connection.close since NCS caches connection.
       // Disabling Sonar warning (squid:S2095)
     } catch (final NetworkException e) {
-      throw new RuntimeException("Cannot connect to " + senderId, e);
+      // TODO #140: Properly classify and handle each RPC failure
+      // Not logging the stacktrace here, as it's not very useful.
+      LOG.error("NCS Exception");
     }
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
index 4d07752..676cb15 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
@@ -27,10 +27,11 @@ import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.IdentifierFactory;
 import org.apache.reef.wake.remote.transport.LinkListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
 import java.net.SocketAddress;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -41,6 +42,8 @@ import java.util.concurrent.Future;
  * Message environment for NCS.
  */
 public final class NcsMessageEnvironment implements MessageEnvironment {
+  private static final Logger LOG = LoggerFactory.getLogger(NcsMessageEnvironment.class.getName());
+
   private static final String NCS_CONN_FACTORY_ID = "NCS_CONN_FACTORY_ID";
 
   private final NetworkConnectionService networkConnectionService;
@@ -62,7 +65,7 @@ public final class NcsMessageEnvironment implements MessageEnvironment {
     this.senderId = senderId;
     this.replyFutureMap = new ReplyFutureMap<>();
     this.listenerConcurrentMap = new ConcurrentHashMap<>();
-    this.receiverToConnectionMap = new HashMap<>();
+    this.receiverToConnectionMap = new ConcurrentHashMap<>();
     this.connectionFactory = networkConnectionService.registerConnectionFactory(
         idFactory.getNewInstance(NCS_CONN_FACTORY_ID),
         new ControlMessageCodec(),
@@ -87,17 +90,15 @@ public final class NcsMessageEnvironment implements MessageEnvironment {
   public <T> Future<MessageSender<T>> asyncConnect(final String receiverId, final String listenerId) {
     try {
       // If the connection toward the receiver exists already, reuses it.
-      final Connection connection = receiverToConnectionMap.computeIfAbsent(receiverId, absentReceiverId -> {
-        try {
-          final Connection newConnection = connectionFactory.newConnection(idFactory.getNewInstance(absentReceiverId));
-          newConnection.open();
-          return newConnection;
-        } catch (final NetworkException e) {
-          throw new RuntimeException(e);
-        }
-      });
+      final Connection connection;
+      if (receiverToConnectionMap.containsKey(receiverId)) {
+        connection = receiverToConnectionMap.get(receiverId);
+      } else {
+        connection = connectionFactory.newConnection(idFactory.getNewInstance(receiverId));
+        connection.open();
+      }
       return CompletableFuture.completedFuture((MessageSender) new NcsMessageSender(connection, replyFutureMap));
-    } catch (final Exception e) {
+    } catch (final NetworkException e) {
       final CompletableFuture<MessageSender<T>> failedFuture = new CompletableFuture<>();
       failedFuture.completeExceptionally(e);
       return failedFuture;
@@ -166,8 +167,9 @@ public final class NcsMessageEnvironment implements MessageEnvironment {
     public void onException(final Throwable throwable,
                             final SocketAddress socketAddress,
                             final Message<ControlMessage.Message> messages) {
-      final ControlMessage.Message controlMessage = extractSingleMessage(messages);
-      throw new RuntimeException(controlMessage.toString(), throwable);
+      // TODO #140: Properly classify and handle each RPC failure
+      // Not logging the stacktrace here, as it's not very useful.
+      LOG.error("NCS Exception");
     }
   }
 
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/TaskMetric.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/TaskMetric.java
index db24f4d..aaa2337 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/TaskMetric.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/TaskMetric.java
@@ -112,7 +112,7 @@ public class TaskMetric implements StateMetric<TaskState.State> {
 
   @Override
   public final boolean processMetricMessage(final String metricField, final byte[] metricValue) {
-    LOG.info("metric {} is just arrived!", metricField);
+    LOG.debug("metric {} is just arrived!", metricField);
     switch (metricField) {
       case "serializedReadBytes":
         setSerializedReadBytes(SerializationUtils.deserialize(metricValue));
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index ad62e68..5750c2a 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -119,6 +119,11 @@ public final class StageEdge extends RuntimeEdge<Stage> {
     return sb.toString();
   }
 
+  @Override
+  public String toString() {
+    return propertiesToJSON();
+  }
+
   /**
    * @return the list between the task idx and key range to read.
    */
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
index ce0a4ff..7663a8c 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
@@ -18,7 +18,6 @@ package edu.snu.nemo.runtime.common.plan;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
-import org.apache.commons.lang.SerializationUtils;
 
 import java.io.Serializable;
 import java.util.List;
@@ -145,8 +144,10 @@ public final class Task implements Serializable {
     sb.append(taskId);
     sb.append(" / attempt: ");
     sb.append(attemptIdx);
-    sb.append(" / irDAG: ");
-    sb.append(SerializationUtils.deserialize(serializedIRDag));
+    sb.append(" / incoming: ");
+    sb.append(taskIncomingEdges);
+    sb.append(" / outgoing: ");
+    sb.append(taskOutgoingEdges);
     sb.append("/ exec props: ");
     sb.append(getExecutionProperties());
     return sb.toString();
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
index 74b808c..be83bf2 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
@@ -57,6 +57,8 @@ public final class TaskState {
 
     // From SHOULD_RETRY
     stateMachineBuilder.addTransition(State.SHOULD_RETRY, State.READY, "Ready to be retried");
+    stateMachineBuilder.addTransition(State.SHOULD_RETRY, State.SHOULD_RETRY,
+        "SHOULD_RETRY can be caused by multiple reasons");
 
     stateMachineBuilder.setInitialState(State.READY);
     return stateMachineBuilder.build();
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoContext.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoContext.java
index d89b7e0..491e950 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoContext.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoContext.java
@@ -15,16 +15,20 @@
  */
 package edu.snu.nemo.driver;
 
+import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.executor.Executor;
 import org.apache.reef.annotations.audience.EvaluatorSide;
 import org.apache.reef.evaluator.context.events.ContextStart;
 import org.apache.reef.evaluator.context.events.ContextStop;
+import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.annotations.Unit;
 import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
+import java.util.Random;
 
 /**
  * REEF Context for the Executor.
@@ -32,13 +36,21 @@ import javax.inject.Inject;
 @EvaluatorSide
 @Unit
 public final class NemoContext {
-
   private static final Logger LOG = LoggerFactory.getLogger(NemoContext.class.getName());
   private final Executor executor;
 
+  private final Clock clock;
+  private final int crashTimeSec;
+
   @Inject
-  private NemoContext(final Executor executor) {
+  private NemoContext(final Executor executor,
+                      @Parameter(JobConf.ExecutorPosionSec.class) final int crashTimeSec,
+                      final Clock clock) {
     this.executor = executor; // To make Tang instantiate Executor
+
+    // For poison handling
+    this.clock = clock;
+    this.crashTimeSec = crashTimeSec;
   }
 
   /**
@@ -48,6 +60,16 @@ public final class NemoContext {
     @Override
     public void onNext(final ContextStart contextStart) {
       LOG.info("Context Started: Executor is now ready and listening for messages");
+
+      // For poison handling
+      if (crashTimeSec >= 0) {
+        final int crashTimeMs = addNoise(crashTimeSec * 1000);
+        LOG.info("Configured {} sec crash time, and actually crashing in {} ms (noise)", crashTimeSec, crashTimeMs);
+        clock.scheduleAlarm(crashTimeMs, (alarm) -> {
+          LOG.info("Poison: crashing immediately");
+          Runtime.getRuntime().halt(1); // Forces this JVM to shut down immediately.
+        });
+      }
     }
   }
 
@@ -55,10 +77,15 @@ public final class NemoContext {
    * Called when the context is stopped.
    */
   public final class ContextStopHandler implements EventHandler<ContextStop> {
-
     @Override
     public void onNext(final ContextStop contextStop) {
       executor.terminate();
     }
   }
+
+  private int addNoise(final int number) {
+    final Random random = new Random();
+    final int fiftyPercent = random.nextInt((int) (number * (50.0 / 100.0)));
+    return random.nextBoolean() ? number + fiftyPercent : number - fiftyPercent; // -50% ~ +50%
+  }
 }
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
index f4493e8..4519116 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
@@ -23,6 +23,7 @@ import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageParameters;
 import edu.snu.nemo.runtime.master.ClientRPC;
 import edu.snu.nemo.runtime.master.RuntimeMaster;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.driver.client.JobMessageObserver;
 import org.apache.reef.driver.context.ActiveContext;
@@ -158,7 +159,8 @@ public final class NemoDriver {
    */
   public void startSchedulingUserApplication(final String dagString) {
     // Launch user application (with a new thread)
-    final ExecutorService userApplicationRunnerThread = Executors.newSingleThreadExecutor();
+    final ExecutorService userApplicationRunnerThread = Executors.newSingleThreadExecutor(
+        new BasicThreadFactory.Builder().namingPattern("User App thread-%d").build());
     userApplicationRunnerThread.execute(() -> userApplicationRunner.run(dagString));
     userApplicationRunnerThread.shutdown();
   }
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
index 6ba615f..de2dcb0 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
@@ -58,7 +58,7 @@ public final class UserApplicationRunner {
   @Inject
   private UserApplicationRunner(@Parameter(JobConf.DAGDirectory.class) final String dagDirectory,
                                 @Parameter(JobConf.OptimizationPolicy.class) final String optimizationPolicy,
-                                @Parameter(JobConf.MaxScheduleAttempt.class) final int maxScheduleAttempt,
+                                @Parameter(JobConf.MaxTaskAttempt.class) final int maxScheduleAttempt,
                                 final PubSubEventHandlerWrapper pubSubEventHandlerWrapper,
                                 final Injector injector,
                                 final RuntimeMaster runtimeMaster) {
@@ -80,7 +80,7 @@ public final class UserApplicationRunner {
    */
   public void run(final String dagString) {
     try {
-      LOG.info("##### Nemo Compiler #####");
+      LOG.info("##### Nemo Compiler Start #####");
 
       final DAG<IRVertex, IREdge> dag = SerializationUtils.deserialize(Base64.getDecoder().decode(dagString));
       dag.storeJSON(dagDirectory, "ir", "IR before optimization");
@@ -103,6 +103,8 @@ public final class UserApplicationRunner {
 
       final PhysicalPlan physicalPlan = backend.compile(optimizedDAG);
 
+      LOG.info("##### Nemo Compiler Finish #####");
+
       physicalPlan.getStageDAG().storeJSON(dagDirectory, "plan", "physical execution plan by compiler");
 
       // Execute!
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index 8852e0e..25f6224 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -37,6 +37,7 @@ import edu.snu.nemo.runtime.executor.data.SerializerManager;
 import edu.snu.nemo.runtime.executor.datatransfer.DataTransferFactory;
 import edu.snu.nemo.runtime.executor.task.TaskExecutor;
 import org.apache.commons.lang3.SerializationUtils;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.reef.tang.annotations.Parameter;
 
 import javax.inject.Inject;
@@ -80,7 +81,9 @@ public final class Executor {
                   final DataTransferFactory dataTransferFactory,
                   final MetricManagerWorker metricMessageSender) {
     this.executorId = executorId;
-    this.executorService = Executors.newCachedThreadPool();
+    this.executorService = Executors.newCachedThreadPool(new BasicThreadFactory.Builder()
+        .namingPattern("TaskExecutor thread-%d")
+        .build());
     this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
     this.serializerManager = serializerManager;
     this.dataTransferFactory = dataTransferFactory;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
index 405ce4c..d8f3b41 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
@@ -70,10 +70,10 @@ public final class MetricManagerWorker implements MetricMessageSender {
 
       final ControlMessage.MetricMsg.Builder metricMsgBuilder = ControlMessage.MetricMsg.newBuilder();
 
-      LOG.info("MetricManagerWorker Size: {}", size);
+      LOG.debug("MetricManagerWorker Size: {}", size);
       for (int i = 0; i < size; i++) {
         final ControlMessage.Metric metric = metricMessageQueue.poll();
-        LOG.info("MetricManagerWorker addMetric: {}, {}, {}", size, i, metric);
+        LOG.debug("MetricManagerWorker addMetric: {}, {}, {}", size, i, metric);
         metricMsgBuilder.addMetric(i, metric);
       }
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransferContext.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransferContext.java
index ec8977b..7cfc898 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransferContext.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransferContext.java
@@ -119,11 +119,7 @@ public abstract class ByteTransferContext {
       return;
     }
     hasException = true;
-    if (cause == null) {
-      LOG.error(String.format("A channel exception set on %s", toString()));
-    } else {
-      LOG.error(String.format("A channel exception set on %s", toString()), cause);
-    }
+    LOG.error(String.format("A channel exception set on %s", toString())); // Not logging throwable, which isn't useful
     exception = cause;
   }
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransport.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransport.java
index fa5116b..3e7aed6 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransport.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransport.java
@@ -209,12 +209,8 @@ final class ByteTransport implements AutoCloseable {
         LOG.debug("Connected to {}", remoteExecutorId);
         return;
       }
-      // Failed to connect
-      if (future.cause() == null) {
-        LOG.error("Failed to connect to {}", remoteExecutorId);
-      } else {
-        LOG.error(String.format("Failed to connect to %s", remoteExecutorId), future.cause());
-      }
+      // Failed to connect (Not logging the cause here, which is not very useful)
+      LOG.error("Failed to connect to {}", remoteExecutorId);
     });
     return connectFuture;
   }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ClosableBlockingQueue.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ClosableBlockingQueue.java
index 872b275..faf3ed1 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ClosableBlockingQueue.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ClosableBlockingQueue.java
@@ -91,13 +91,15 @@ public final class ClosableBlockingQueue<T> implements AutoCloseable {
    */
   @Nullable
   public synchronized T take() throws InterruptedException {
+    while (queue.isEmpty() && !closed) {
+      wait();
+    }
+
+    // This should come after wait(), to be always checked on close
     if (throwable != null) {
       throw new RuntimeException(throwable);
     }
 
-    while (queue.isEmpty() && !closed) {
-      wait();
-    }
     // retrieves and removes the head of the underlying collection, or return null if the queue is empty
     return queue.poll();
   }
@@ -110,13 +112,15 @@ public final class ClosableBlockingQueue<T> implements AutoCloseable {
    */
   @Nullable
   public synchronized T peek() throws InterruptedException {
+    while (queue.isEmpty() && !closed) {
+      wait();
+    }
+
+    // This should come after wait(), to be always checked on close
     if (throwable != null) {
       throw new RuntimeException(throwable);
     }
 
-    while (queue.isEmpty() && !closed) {
-      wait();
-    }
     // retrieves the head of the underlying collection, or return null if the queue is empty
     return queue.peek();
   }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index c3d41f7..d6a3da7 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -61,18 +61,23 @@ public final class BlockManagerWorker {
   private static final String REMOTE_FILE_STORE = "REMOTE_FILE_STORE";
 
   private final String executorId;
+  private final SerializerManager serializerManager;
+
+  // Block stores
   private final MemoryStore memoryStore;
   private final SerializedMemoryStore serializedMemoryStore;
   private final LocalFileStore localFileStore;
   private final RemoteFileStore remoteFileStore;
+
+  // To-Master connections
   private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
+  private final Map<String, CompletableFuture<ControlMessage.Message>> pendingBlockLocationRequest;
+
+  // To-Executor connections
   private final ByteTransfer byteTransfer;
-  // Executor service to schedule I/O Runnable which can be done in background.
   private final ExecutorService backgroundExecutorService;
   private final Map<String, AtomicInteger> blockToRemainingRead;
-  private final SerializerManager serializerManager;
-  private final Map<String, CompletableFuture<ControlMessage.Message>> pendingBlockLocationRequest;
-  private final BlockTransferConnectionQueue blockTransferConnectionQueue;
+  private final BlockTransferThrottler blockTransferThrottler;
 
   /**
    * Constructor.
@@ -86,7 +91,7 @@ public final class BlockManagerWorker {
    * @param persistentConnectionToMasterMap the connection map.
    * @param byteTransfer                    the byte transfer.
    * @param serializerManager               the serializer manager.
-   * @param blockTransferConnectionQueue    restricts parallel connections
+   * @param blockTransferThrottler    restricts parallel connections
    */
   @Inject
   private BlockManagerWorker(@Parameter(JobConf.ExecutorId.class) final String executorId,
@@ -98,7 +103,7 @@ public final class BlockManagerWorker {
                              final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
                              final ByteTransfer byteTransfer,
                              final SerializerManager serializerManager,
-                             final BlockTransferConnectionQueue blockTransferConnectionQueue) {
+                             final BlockTransferThrottler blockTransferThrottler) {
     this.executorId = executorId;
     this.memoryStore = memoryStore;
     this.serializedMemoryStore = serializedMemoryStore;
@@ -110,9 +115,11 @@ public final class BlockManagerWorker {
     this.blockToRemainingRead = new ConcurrentHashMap<>();
     this.serializerManager = serializerManager;
     this.pendingBlockLocationRequest = new ConcurrentHashMap<>();
-    this.blockTransferConnectionQueue = blockTransferConnectionQueue;
+    this.blockTransferThrottler = blockTransferThrottler;
   }
 
+  //////////////////////////////////////////////////////////// Main public methods
+
   /**
    * Creates a new block.
    *
@@ -128,52 +135,6 @@ public final class BlockManagerWorker {
   }
 
   /**
-   * Retrieves data from the stored block. A specific hash value range can be designated.
-   *
-   * @param blockId    of the block.
-   * @param blockStore for the data storage.
-   * @param keyRange   the key range descriptor.
-   * @return the result data in the block.
-   */
-  private CompletableFuture<DataUtil.IteratorWithNumBytes> getDataFromLocalBlock(
-      final String blockId,
-      final InterTaskDataStoreProperty.Value blockStore,
-      final KeyRange keyRange) {
-    final BlockStore store = getBlockStore(blockStore);
-
-    // First, try to fetch the block from local BlockStore.
-    final Optional<Block> optionalBlock = store.readBlock(blockId);
-
-    if (optionalBlock.isPresent()) {
-      final Iterable<NonSerializedPartition> partitions = optionalBlock.get().readPartitions(keyRange);
-      handleUsedData(blockStore, blockId);
-
-      // Block resides in this evaluator!
-      try {
-        final Iterator innerIterator = DataUtil.concatNonSerPartitions(partitions).iterator();
-        long numSerializedBytes = 0;
-        long numEncodedBytes = 0;
-        try {
-          for (final NonSerializedPartition partition : partitions) {
-            numSerializedBytes += partition.getNumSerializedBytes();
-            numEncodedBytes += partition.getNumEncodedBytes();
-          }
-
-          return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator, numSerializedBytes,
-              numEncodedBytes));
-        } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
-          return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator));
-        }
-      } catch (final IOException e) {
-        throw new BlockFetchException(e);
-      }
-    } else {
-      // We don't have the block here...
-      throw new RuntimeException(String.format("Block %s not found in local BlockManagerWorker", blockId));
-    }
-  }
-
-  /**
    * Inquiries the location of the specific block and routes the request to the local block manager worker
    * or to the lower data plane.
    * This can be invoked multiple times per blockId (maybe due to failures).
@@ -184,7 +145,7 @@ public final class BlockManagerWorker {
    * @param keyRange      the key range descriptor
    * @return the {@link CompletableFuture} of the block.
    */
-  public CompletableFuture<DataUtil.IteratorWithNumBytes> queryBlock(
+  public CompletableFuture<DataUtil.IteratorWithNumBytes> readBlock(
       final String blockId,
       final String runtimeEdgeId,
       final InterTaskDataStoreProperty.Value blockStore,
@@ -192,7 +153,10 @@ public final class BlockManagerWorker {
     // Let's see if a remote worker has it
     final CompletableFuture<ControlMessage.Message> blockLocationFuture =
         pendingBlockLocationRequest.computeIfAbsent(blockId, blockIdToRequest -> {
-          // Ask Master for the location
+          // Ask Master for the location.
+          // (IMPORTANT): This 'request' effectively blocks the TaskExecutor thread if the block is IN_PROGRESS.
+          // We use this property to make the receiver task of a 'push' edge to wait in an Executor for its input data
+          // to become available.
           final CompletableFuture<ControlMessage.Message> responseFromMasterFuture = persistentConnectionToMasterMap
               .getMessageSender(MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID).request(
                   ControlMessage.Message.newBuilder()
@@ -221,7 +185,7 @@ public final class BlockManagerWorker {
           responseFromMaster.getBlockLocationInfoMsg();
       if (!blockLocationInfoMsg.hasOwnerExecutorId()) {
         throw new BlockFetchException(new Throwable(
-            "Block " + blockId + " not found both in any storage: "
+            "Block " + blockId + " location unknown: "
                 + "The block state is " + blockLocationInfoMsg.getState()));
       }
       // This is the executor id that we wanted to know
@@ -236,11 +200,24 @@ public final class BlockManagerWorker {
             .setRuntimeEdgeId(runtimeEdgeId)
             .setKeyRange(ByteString.copyFrom(SerializationUtils.serialize(keyRange)))
             .build();
-        final CompletableFuture<ByteInputContext> contextFuture = blockTransferConnectionQueue
-            .requestConnectPermission(runtimeEdgeId)
+        final CompletableFuture<ByteInputContext> contextFuture = blockTransferThrottler
+            .requestTransferPermission(runtimeEdgeId)
             .thenCompose(obj -> byteTransfer.newInputContext(targetExecutorId, descriptor.toByteArray()));
-        contextFuture.thenApply(context -> context.getCompletedFuture()
-            .thenAccept(f -> blockTransferConnectionQueue.onConnectionFinished(runtimeEdgeId)));
+
+        // whenComplete() ensures that blockTransferThrottler.onTransferFinished() is always called,
+        // even on failures. Actual failure handling and Task retry will be done by DataFetcher.
+        contextFuture.whenComplete((connectionContext, connectionThrowable) -> {
+          if (connectionThrowable != null) {
+            // Something wrong with the connection. Notify blockTransferThrottler immediately.
+            blockTransferThrottler.onTransferFinished(runtimeEdgeId);
+          } else {
+            // Connection is okay. Notify blockTransferThrottler when the actual transfer is done, or fails.
+            connectionContext.getCompletedFuture().whenComplete((transferContext, transferThrowable) -> {
+              blockTransferThrottler.onTransferFinished(runtimeEdgeId);
+            });
+          }
+        });
+
         return contextFuture
             .thenApply(context -> new DataUtil.InputStreamIterator(context.getInputStreams(),
                 serializerManager.getSerializer(runtimeEdgeId)));
@@ -364,48 +341,7 @@ public final class BlockManagerWorker {
     }
   }
 
-  /**
-   * Handles used {@link edu.snu.nemo.runtime.executor.data.block.Block}.
-   *
-   * @param blockStore the store which contains the block.
-   * @param blockId    the ID of the block.
-   */
-  private void handleUsedData(final InterTaskDataStoreProperty.Value blockStore,
-                              final String blockId) {
-    final AtomicInteger remainingExpectedRead = blockToRemainingRead.get(blockId);
-    if (remainingExpectedRead != null) {
-      if (remainingExpectedRead.decrementAndGet() == 0) {
-        // This block should be discarded.
-        blockToRemainingRead.remove(blockId);
-        backgroundExecutorService.submit(new Runnable() {
-          @Override
-          public void run() {
-            removeBlock(blockId, blockStore);
-          }
-        });
-      }
-    } // If null, just keep the data in the store.
-  }
-
-  /**
-   * Gets the {@link BlockStore} from annotated value of {@link InterTaskDataStoreProperty}.
-   * @param blockStore the annotated value of {@link InterTaskDataStoreProperty}.
-   * @return the block store.
-   */
-  private BlockStore getBlockStore(final InterTaskDataStoreProperty.Value blockStore) {
-    switch (blockStore) {
-      case MemoryStore:
-        return memoryStore;
-      case SerializedMemoryStore:
-        return serializedMemoryStore;
-      case LocalFileStore:
-        return localFileStore;
-      case GlusterFileStore:
-        return remoteFileStore;
-      default:
-        throw new UnsupportedBlockStoreException(new Exception(blockStore + " is not supported."));
-    }
-  }
+  //////////////////////////////////////////////////////////// Public methods for remote block I/O
 
   /**
    * Respond to a block request by another executor.
@@ -472,6 +408,101 @@ public final class BlockManagerWorker {
     throw new IllegalStateException("No logic here");
   }
 
+  //////////////////////////////////////////////////////////// Private helper methods
+
+  /**
+   * Retrieves data from the stored block. A specific hash value range can be designated.
+   *
+   * @param blockId    of the block.
+   * @param blockStore for the data storage.
+   * @param keyRange   the key range descriptor.
+   * @return the result data in the block.
+   */
+  private CompletableFuture<DataUtil.IteratorWithNumBytes> getDataFromLocalBlock(
+      final String blockId,
+      final InterTaskDataStoreProperty.Value blockStore,
+      final KeyRange keyRange) {
+    final BlockStore store = getBlockStore(blockStore);
+
+    // First, try to fetch the block from local BlockStore.
+    final Optional<Block> optionalBlock = store.readBlock(blockId);
+
+    if (optionalBlock.isPresent()) {
+      final Iterable<NonSerializedPartition> partitions = optionalBlock.get().readPartitions(keyRange);
+      handleUsedData(blockStore, blockId);
+
+      // Block resides in this evaluator!
+      try {
+        final Iterator innerIterator = DataUtil.concatNonSerPartitions(partitions).iterator();
+        long numSerializedBytes = 0;
+        long numEncodedBytes = 0;
+        try {
+          for (final NonSerializedPartition partition : partitions) {
+            numSerializedBytes += partition.getNumSerializedBytes();
+            numEncodedBytes += partition.getNumEncodedBytes();
+          }
+
+          return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator, numSerializedBytes,
+              numEncodedBytes));
+        } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
+          return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator));
+        }
+      } catch (final IOException e) {
+        throw new BlockFetchException(e);
+      }
+    } else {
+      // We don't have the block here...
+      throw new RuntimeException(String.format("Block %s not found in local BlockManagerWorker", blockId));
+    }
+  }
+
+
+  /**
+   * Handles used {@link edu.snu.nemo.runtime.executor.data.block.Block}.
+   *
+   * @param blockStore the store which contains the block.
+   * @param blockId    the ID of the block.
+   */
+  private void handleUsedData(final InterTaskDataStoreProperty.Value blockStore,
+                              final String blockId) {
+    final AtomicInteger remainingExpectedRead = blockToRemainingRead.get(blockId);
+    if (remainingExpectedRead != null) {
+      if (remainingExpectedRead.decrementAndGet() == 0) {
+        // This block should be discarded.
+        blockToRemainingRead.remove(blockId);
+        backgroundExecutorService.submit(new Runnable() {
+          @Override
+          public void run() {
+            removeBlock(blockId, blockStore);
+          }
+        });
+      }
+    } // If null, just keep the data in the store.
+  }
+
+  //////////////////////////////////////////////////////////// Converters
+
+  /**
+   * Gets the {@link BlockStore} from annotated value of {@link InterTaskDataStoreProperty}.
+   * @param blockStore the annotated value of {@link InterTaskDataStoreProperty}.
+   * @return the block store.
+   */
+  private BlockStore getBlockStore(final InterTaskDataStoreProperty.Value blockStore) {
+    switch (blockStore) {
+      case MemoryStore:
+        return memoryStore;
+      case SerializedMemoryStore:
+        return serializedMemoryStore;
+      case LocalFileStore:
+        return localFileStore;
+      case GlusterFileStore:
+        return remoteFileStore;
+      default:
+        throw new UnsupportedBlockStoreException(new Exception(blockStore + " is not supported."));
+    }
+  }
+
+
   /**
    * Decodes BlockStore property from protocol buffer.
    * @param blockStore property from protocol buffer
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueue.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferThrottler.java
similarity index 86%
rename from runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueue.java
rename to runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferThrottler.java
index 0870082..8d9f8a1 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueue.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferThrottler.java
@@ -17,6 +17,8 @@ package edu.snu.nemo.runtime.executor.data;
 
 import edu.snu.nemo.conf.JobConf;
 import org.apache.reef.tang.annotations.Parameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
 import java.util.ArrayDeque;
@@ -30,13 +32,14 @@ import java.util.concurrent.CompletableFuture;
  * Executors can suffer from performance degradation and network-related exceptions when there are massive connections,
  * especially under low network bandwidth or high volume of data.
  */
-public final class BlockTransferConnectionQueue {
+public final class BlockTransferThrottler {
+  private static final Logger LOG = LoggerFactory.getLogger(BlockTransferThrottler.class.getName());
   private final Map<String, Integer> runtimeEdgeIdToNumCurrentConnections = new HashMap<>();
   private final Map<String, Queue<CompletableFuture<Void>>> runtimeEdgeIdToPendingConnections = new HashMap<>();
   private final int maxNum;
 
   @Inject
-  private BlockTransferConnectionQueue(@Parameter(JobConf.MaxNumDownloadsForARuntimeEdge.class) final int maxNum) {
+  private BlockTransferThrottler(@Parameter(JobConf.MaxNumDownloadsForARuntimeEdge.class) final int maxNum) {
     this.maxNum = maxNum;
   }
 
@@ -45,7 +48,7 @@ public final class BlockTransferConnectionQueue {
    * @param runtimeEdgeId the corresponding runtime edge id.
    * @return a future that will be completed when the connection is granted.
    */
-  public synchronized CompletableFuture<Void> requestConnectPermission(final String runtimeEdgeId) {
+  public synchronized CompletableFuture<Void> requestTransferPermission(final String runtimeEdgeId) {
     runtimeEdgeIdToNumCurrentConnections.putIfAbsent(runtimeEdgeId, 0);
     runtimeEdgeIdToPendingConnections.computeIfAbsent(runtimeEdgeId, id -> new ArrayDeque<>());
     final int currentOutstandingConnections = runtimeEdgeIdToNumCurrentConnections.get(runtimeEdgeId);
@@ -63,10 +66,10 @@ public final class BlockTransferConnectionQueue {
   }
 
   /**
-   * Indicates the connection has finished.
+   * Indicates the transfer has finished.
    * @param runtimeEdgeId the corresponding runtime edge id.
    */
-  public synchronized void onConnectionFinished(final String runtimeEdgeId) {
+  public synchronized void onTransferFinished(final String runtimeEdgeId) {
     final Queue<CompletableFuture<Void>> pendingConnections = runtimeEdgeIdToPendingConnections.get(runtimeEdgeId);
     if (pendingConnections.size() == 0) {
       // Just decrease the number of current connections.
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index 2b29b5f..e417c82 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -31,6 +31,8 @@ import edu.snu.nemo.common.exception.UnsupportedCommPatternException;
 import edu.snu.nemo.runtime.common.data.HashRange;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
@@ -43,6 +45,7 @@ import java.util.stream.StreamSupport;
  * Represents the input data transfer to a task.
  */
 public final class InputReader extends DataTransfer {
+  private static final Logger LOG = LoggerFactory.getLogger(InputReader.class.getName());
   private final int dstTaskIndex;
   private final BlockManagerWorker blockManagerWorker;
 
@@ -89,7 +92,7 @@ public final class InputReader extends DataTransfer {
     final String blockId = getBlockId(dstTaskIndex);
     final Optional<InterTaskDataStoreProperty.Value> dataStoreProperty
         = runtimeEdge.getPropertyValue(InterTaskDataStoreProperty.class);
-    return blockManagerWorker.queryBlock(blockId, getId(), dataStoreProperty.get(), HashRange.all());
+    return blockManagerWorker.readBlock(blockId, getId(), dataStoreProperty.get(), HashRange.all());
   }
 
   private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readBroadcast() {
@@ -100,7 +103,7 @@ public final class InputReader extends DataTransfer {
     final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = new ArrayList<>();
     for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
       final String blockId = getBlockId(srcTaskIdx);
-      futures.add(blockManagerWorker.queryBlock(blockId, getId(), dataStoreProperty.get(), HashRange.all()));
+      futures.add(blockManagerWorker.readBlock(blockId, getId(), dataStoreProperty.get(), HashRange.all()));
     }
 
     return futures;
@@ -127,7 +130,7 @@ public final class InputReader extends DataTransfer {
     for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
       final String blockId = getBlockId(srcTaskIdx);
       futures.add(
-          blockManagerWorker.queryBlock(blockId, getId(), dataStoreProperty.get(), hashRangeToRead));
+          blockManagerWorker.readBlock(blockId, getId(), dataStoreProperty.get(), hashRangeToRead));
     }
 
     return futures;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
index 741bb30..bfcd040 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
@@ -325,7 +325,9 @@ public final class TaskExecutor {
   }
 
   private void handleMainOutputElement(final VertexHarness harness, final Object element) {
-    harness.getWritersToChildrenTasks().forEach(outputWriter -> outputWriter.write(element));
+    harness.getWritersToChildrenTasks().forEach(outputWriter -> {
+      outputWriter.write(element);
+    });
     if (harness.getSideInputChildren().size() > 0) {
       sideInputMap.put(((OperatorVertex) harness.getIRVertex()).getTransform().getTag(), element);
     }
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueueTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferThrottlerTest.java
similarity index 73%
rename from runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueueTest.java
rename to runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferThrottlerTest.java
index eeb0031..2c815b7 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueueTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferThrottlerTest.java
@@ -16,7 +16,6 @@
 package edu.snu.nemo.runtime.executor.data;
 
 import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.executor.data.BlockTransferConnectionQueue;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
@@ -29,22 +28,22 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import static org.junit.Assert.assertFalse;
 
-public final class BlockTransferConnectionQueueTest {
-  private static final String THREAD_NAME = BlockTransferConnectionQueue.class.getSimpleName() + "-TestThread";
+public final class BlockTransferThrottlerTest {
+  private static final String THREAD_NAME = BlockTransferThrottler.class.getSimpleName() + "-TestThread";
   private static final String RUNTIME_EDGE_0 = "RuntimeEdge0";
   private static final int WAIT_TIME = 1000;
   /**
-   * Creates {@link BlockTransferConnectionQueue} for testing.
+   * Creates {@link BlockTransferThrottler} for testing.
    * @param maxNum value for {@link JobConf.MaxNumDownloadsForARuntimeEdge} parameter.
-   * @return {@link BlockTransferConnectionQueue} object created.
+   * @return {@link BlockTransferThrottler} object created.
    */
-  private final BlockTransferConnectionQueue getQueue(final int maxNum) {
+  private final BlockTransferThrottler getQueue(final int maxNum) {
     final Configuration conf = Tang.Factory.getTang().newConfigurationBuilder()
         .bindNamedParameter(JobConf.MaxNumDownloadsForARuntimeEdge.class, String.valueOf(maxNum))
         .build();
     final Injector injector = Tang.Factory.getTang().newInjector(conf);
     try {
-      return injector.getInstance(BlockTransferConnectionQueue.class);
+      return injector.getInstance(BlockTransferThrottler.class);
     } catch (final InjectionException e) {
       throw new RuntimeException(e);
     }
@@ -54,13 +53,13 @@ public final class BlockTransferConnectionQueueTest {
   public void test() throws InterruptedException, ExecutionException {
     final ExecutorService executorService = Executors.newSingleThreadExecutor(
         runnable -> new Thread(runnable, THREAD_NAME));
-    final BlockTransferConnectionQueue queue = getQueue(3);
+    final BlockTransferThrottler queue = getQueue(3);
     final Future executorServiceFuture = executorService.submit(() -> {
       try {
-        queue.requestConnectPermission(RUNTIME_EDGE_0).get();
-        queue.requestConnectPermission(RUNTIME_EDGE_0).get();
-        queue.requestConnectPermission(RUNTIME_EDGE_0).get();
-        queue.requestConnectPermission(RUNTIME_EDGE_0).get();
+        queue.requestTransferPermission(RUNTIME_EDGE_0).get();
+        queue.requestTransferPermission(RUNTIME_EDGE_0).get();
+        queue.requestTransferPermission(RUNTIME_EDGE_0).get();
+        queue.requestTransferPermission(RUNTIME_EDGE_0).get();
       } catch (final InterruptedException | ExecutionException e) {
         throw new RuntimeException(e);
       }
@@ -68,7 +67,7 @@ public final class BlockTransferConnectionQueueTest {
     Thread.sleep(WAIT_TIME);
     // We must have one pending connection request.
     assertFalse(executorServiceFuture.isDone());
-    queue.onConnectionFinished(RUNTIME_EDGE_0);
+    queue.onTransferFinished(RUNTIME_EDGE_0);
     // The remaining request should be accepted before test timeout.
     executorServiceFuture.get();
   }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index da7c1ef..f07fe1b 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -139,7 +139,6 @@ public final class BlockManagerMaster {
    */
   public Set<String> removeWorker(final String executorId) {
     final Set<String> tasksToRecompute = new HashSet<>();
-    LOG.warn("Worker {} is removed.", new Object[]{executorId});
 
     final Lock writeLock = lock.writeLock();
     writeLock.lock();
@@ -252,11 +251,10 @@ public final class BlockManagerMaster {
     writeLock.lock();
     try {
       if (producerTaskIdToBlockIds.containsKey(failedTaskId)) {
-        LOG.info("ProducerTask {} failed for a list of blocks:", failedTaskId);
         producerTaskIdToBlockIds.get(failedTaskId).forEach(blockId -> {
           final BlockState.State state = (BlockState.State)
               blockIdToMetadata.get(blockId).getBlockState().getStateMachine().getCurrentState();
-          LOG.info("Partition lost: {}", blockId);
+          LOG.info("Block lost: {}", blockId);
           onBlockStateChanged(blockId, BlockState.State.NOT_AVAILABLE, null);
         });
       } // else this task does not produce any block
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
index c5957db..9e6d471 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
@@ -61,23 +61,23 @@ final class BlockMetadata {
 
     switch (newState) {
       case IN_PROGRESS:
-        stateMachine.setState(newState);
         break;
       case NOT_AVAILABLE:
-        LOG.info("Block {} lost in {}", new Object[]{blockId, location});
         // Reset the block location and committer information.
         locationHandler.completeExceptionally(new AbsentBlockException(blockId, newState));
         locationHandler = new BlockManagerMaster.BlockLocationRequestHandler(blockId);
-        stateMachine.setState(newState);
         break;
       case AVAILABLE:
-        assert (location != null);
+        if (location == null) {
+          throw new RuntimeException("Null location");
+        }
         locationHandler.complete(location);
-        stateMachine.setState(newState);
         break;
       default:
         throw new UnsupportedOperationException(newState.toString());
     }
+
+    stateMachine.setState(newState);
   }
 
   /**
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
index fd3e582..9ef2f8e 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
@@ -249,7 +249,7 @@ public final class JobStateManager {
     if (newState == JobState.State.EXECUTING) {
       LOG.debug("Executing Job ID {}...", this.jobId);
     } else if (newState == JobState.State.COMPLETE || newState == JobState.State.FAILED) {
-      LOG.info("Job ID {} {}!", new Object[]{jobId, newState});
+      LOG.debug("Job ID {} {}!", new Object[]{jobId, newState});
 
       // Awake all threads waiting the finish of this job.
       finishLock.lock();
@@ -354,7 +354,7 @@ public final class JobStateManager {
     file.getParentFile().mkdirs();
     try (final PrintWriter printWriter = new PrintWriter(file)) {
       printWriter.println(toStringWithPhysicalPlan());
-      LOG.info(String.format("JSON representation of job state for %s(%s) was saved to %s",
+      LOG.debug(String.format("JSON representation of job state for %s(%s) was saved to %s",
           jobId, suffix, file.getPath()));
     } catch (final IOException e) {
       LOG.warn(String.format("Cannot store JSON representation of job state for %s(%s) to %s: %s",
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 43e46cc..806f000 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -108,7 +108,8 @@ public final class RuntimeMaster {
     // since the processing logic in master takes a very short amount of time
     // compared to the job completion times of executed jobs
     // and keeping it single threaded removes the complexity of multi-thread synchronization.
-    this.runtimeMasterThread = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "RuntimeMaster"));
+    this.runtimeMasterThread =
+        Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "RuntimeMaster thread"));
     this.scheduler = scheduler;
     this.containerManager = containerManager;
     this.blockManagerMaster = blockManagerMaster;
@@ -218,13 +219,13 @@ public final class RuntimeMaster {
 
         for (int i = 0; i < jsonRootNode.size(); i++) {
           final TreeNode resourceNode = jsonRootNode.get(i);
-          final ResourceSpecification.Builder builder = ResourceSpecification.newBuilder();
-          builder.setContainerType(resourceNode.get("type").traverse().nextTextValue());
-          builder.setMemory(resourceNode.get("memory_mb").traverse().getIntValue());
-          builder.setCapacity(resourceNode.get("capacity").traverse().getIntValue());
+          final String type = resourceNode.get("type").traverse().nextTextValue();
+          final int memory = resourceNode.get("memory_mb").traverse().getIntValue();
+          final int capacity = resourceNode.get("capacity").traverse().getIntValue();
           final int executorNum = resourceNode.path("num").traverse().nextIntValue(1);
+          final int poisonSec = resourceNode.path("poison_sec").traverse().nextIntValue(-1);
           resourceRequestCount.getAndAdd(executorNum);
-          containerManager.requestContainer(executorNum, builder.build());
+          containerManager.requestContainer(executorNum, new ResourceSpecification(type, capacity, memory, poisonSec));
         }
         metricCountDownLatch = new CountDownLatch(resourceRequestCount.get());
       } catch (final Exception e) {
@@ -288,7 +289,6 @@ public final class RuntimeMaster {
    */
   public void onExecutorFailed(final FailedEvaluator failedEvaluator) {
     runtimeMasterThread.execute(() -> {
-      LOG.info("onExecutorFailed: {}", failedEvaluator.getId());
       metricCountDownLatch.countDown();
 
       // Note that getFailedContextList() can be empty if the failure occurred
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java
index 32d4bd9..a099a11 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java
@@ -17,6 +17,7 @@ package edu.snu.nemo.runtime.master.resource;
 
 import edu.snu.nemo.common.exception.ContainerException;
 import edu.snu.nemo.conf.JobConf;
+import edu.snu.nemo.runtime.common.message.FailedMessageSender;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageSender;
 import org.apache.reef.annotations.audience.DriverSide;
@@ -25,6 +26,8 @@ import org.apache.reef.driver.evaluator.AllocatedEvaluator;
 import org.apache.reef.driver.evaluator.EvaluatorRequest;
 import org.apache.reef.driver.evaluator.EvaluatorRequestor;
 import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.annotations.Parameter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,6 +36,7 @@ import javax.annotation.concurrent.NotThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -146,7 +150,12 @@ public final class ContainerManager {
         + ") allocated, will be used for [" + executorId + "]");
     pendingContextIdToResourceSpec.put(executorId, resourceSpecification);
 
-    allocatedContainer.submitContext(executorConfiguration);
+    // Poison handling
+    final Configuration poisonConfiguration = Tang.Factory.getTang().newConfigurationBuilder()
+        .bindNamedParameter(JobConf.ExecutorPosionSec.class, String.valueOf(resourceSpecification.getPoisonSec()))
+        .build();
+
+    allocatedContainer.submitContext(Configurations.merge(executorConfiguration, poisonConfiguration));
   }
 
   /**
@@ -165,16 +174,16 @@ public final class ContainerManager {
 
     // We set contextId = executorId in NemoDriver when we generate executor configuration.
     final String executorId = activeContext.getId();
-
     final ResourceSpecification resourceSpec = pendingContextIdToResourceSpec.remove(executorId);
 
     // Connect to the executor and initiate Master side's executor representation.
-    final MessageSender messageSender;
+    MessageSender messageSender;
     try {
       messageSender =
           messageEnvironment.asyncConnect(executorId, MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID).get();
-    } catch (final Exception e) {
-      throw new RuntimeException(e);
+    } catch (final InterruptedException | ExecutionException e) {
+      // TODO #140: Properly classify and handle each RPC failure
+      messageSender = new FailedMessageSender();
     }
 
     // Create the executor representation.
@@ -182,9 +191,8 @@ public final class ContainerManager {
         new ExecutorRepresenter(executorId, resourceSpec, messageSender, activeContext, serializationExecutorService,
             activeContext.getEvaluatorDescriptor().getNodeDescriptor().getName());
 
-    LOG.info("{} is up and running at {}", executorId, executorRepresenter.getNodeName());
-
     requestLatchByResourceSpecId.get(resourceSpec.getResourceSpecId()).countDown();
+
     return Optional.of(executorRepresenter);
   }
 
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index e431074..e0df766 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -170,24 +170,6 @@ public final class ExecutorRepresenter {
     return runningTasks;
   }
 
-  public Map<Task, Integer> getRunningTaskToAttempt() {
-    return runningTaskToAttempt;
-  }
-
-  /**
-   * @return set of ids of Tasks that have been failed in this exeuctor
-
-  public Set<String> getFailedTasks() {
-    return failedTasks;
-  }
-
-  /**
-   * @return set of ids of Tasks that have been completed in this executor
-   */
-  public Set<Task> getCompleteTasks() {
-    return completeTasks;
-  }
-
   /**
    * @return the executor id
    */
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ResourceSpecification.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ResourceSpecification.java
index 6a2b2b8..173a477 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ResourceSpecification.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ResourceSpecification.java
@@ -26,14 +26,23 @@ public final class ResourceSpecification {
   private final String containerType;
   private final int capacity;
   private final int memory;
+  private final int poisonSec; // -1 if this resources is not poisoned
 
   public ResourceSpecification(final String containerType,
                                final int capacity,
                                final int memory) {
+    this(containerType, capacity, memory, -1);
+  }
+
+  public ResourceSpecification(final String containerType,
+                               final int capacity,
+                               final int memory,
+                               final int poisonSec) {
     this.resourceSpecId = RuntimeIdGenerator.generateResourceSpecId();
     this.containerType = containerType;
     this.capacity = capacity;
     this.memory = memory;
+    this.poisonSec = poisonSec;
   }
 
   /**
@@ -62,58 +71,10 @@ public final class ResourceSpecification {
   }
 
   /**
-   * @return {@link Builder} for {@link ResourceSpecification}.
+   * @return -1   if this resource is not poisoned. (for all other normal cases)
+   *         >= 0 the expected time to failure by poison. (for fault-handling tests)
    */
-  public static Builder newBuilder() {
-    return new Builder();
-  }
-
-  /**
-   * A Builder class for {@link ResourceSpecification}.
-   */
-  public static final class Builder {
-    private String containerType;
-    private Integer capacity;
-    private Integer memory;
-
-    private Builder() {
-    }
-
-    /**
-     * @param inputContainerType the container type
-     * @return {@link Builder} object.
-     */
-    public Builder setContainerType(final String inputContainerType) {
-      this.containerType = inputContainerType;
-      return this;
-    }
-
-    /**
-     * @param inputCapacity the number of Tasks that can be run in this container
-     * @return {@link Builder} object.
-     */
-    public Builder setCapacity(final int inputCapacity) {
-      this.capacity = inputCapacity;
-      return this;
-    }
-
-    /**
-     * @param inputMemory the size of the memory allocated, in megabytes
-     * @return {@link Builder} object.
-     */
-    public Builder setMemory(final int inputMemory) {
-      this.memory = inputMemory;
-      return this;
-    }
-
-    /**
-     * @return the {@link ResourceSpecification} object that has been built
-     */
-    public ResourceSpecification build() {
-      assert (containerType != null);
-      assert (capacity != null);
-      assert (memory != null);
-      return new ResourceSpecification(containerType, capacity, memory);
-    }
+  public int getPoisonSec() {
+    return poisonSec;
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index 880c8d0..531fced 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -192,7 +192,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
           }
           break;
         case SHOULD_RETRY:
-          // Retry the failed task
+          // Do retry
           doSchedule();
           break;
         default:
@@ -224,13 +224,14 @@ public final class BatchSingleJobScheduler implements Scheduler {
 
   @Override
   public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
-    LOG.info("{} added", executorRepresenter.getExecutorId());
+    LOG.info("{} added (node: {})", executorRepresenter.getExecutorId(), executorRepresenter.getNodeName());
     executorRegistry.registerExecutor(executorRepresenter);
     schedulerRunner.onExecutorSlotAvailable();
   }
 
   @Override
   public void onExecutorRemoved(final String executorId) {
+    LOG.info("{} removed", executorId);
     blockManagerMaster.removeWorker(executorId);
 
     // These are tasks that were running at the time of executor removal.
@@ -240,14 +241,8 @@ public final class BatchSingleJobScheduler implements Scheduler {
       return Pair.of(executor, ExecutorRegistry.ExecutorState.FAILED);
     });
 
-    // We need to retry the interrupted tasks, and also recover the tasks' missing input blocks if needed.
-    final Set<String> tasksToReExecute =
-        Sets.union(interruptedTasks, recursivelyGetParentTasksForLostBlocks(interruptedTasks));
-
-    // Report SHOULD_RETRY tasks so they can be re-scheduled
-    LOG.info("{} removed: {} will be retried", executorId, tasksToReExecute);
-    tasksToReExecute.forEach(
-        taskToReExecute -> jobStateManager.onTaskStateChanged(taskToReExecute, TaskState.State.SHOULD_RETRY));
+    // Retry the interrupted tasks (and required parents)
+    retryTasksAndRequiredParents(interruptedTasks);
 
     // Trigger the scheduling of SHOULD_RETRY tasks in the earliest scheduleGroup
     doSchedule();
@@ -263,8 +258,11 @@ public final class BatchSingleJobScheduler implements Scheduler {
 
   /**
    * The main entry point for task scheduling.
-   * This operation can be invoked at any point during job execution, as it is designed to be free of side-effects,
-   * and integrate well with {@link PendingTaskCollectionPointer} and {@link SchedulerRunner}.
+   * This operation can be invoked at any point during job execution, as it is designed to be free of side-effects.
+   *
+   * These are the reasons why.
+   * - We 'reset' {@link PendingTaskCollectionPointer}, and not 'add' new tasks to it
+   * - We make {@link SchedulerRunner} run only tasks that are READY.
    */
   private void doSchedule() {
     final Optional<List<Stage>> earliest = selectEarliestSchedulableGroup();
@@ -275,8 +273,14 @@ public final class BatchSingleJobScheduler implements Scheduler {
           .flatMap(stage -> selectSchedulableTasks(stage).stream())
           .collect(Collectors.toList());
 
-      LOG.info("Attempting to schedule {} in the same ScheduleGroup",
-          tasksToSchedule.stream().map(Task::getTaskId).collect(Collectors.toList()));
+      // We prefer (but not guarantee) to schedule the 'receiving' tasks first,
+      // assuming that tasks within a ScheduleGroup are connected with 'push' edges.
+      Collections.reverse(tasksToSchedule);
+
+      LOG.info("Scheduling some tasks in {}, which are in the same ScheduleGroup", tasksToSchedule.stream()
+          .map(Task::getTaskId)
+          .map(RuntimeIdGenerator::getStageIdFromTaskId)
+          .collect(Collectors.toSet()));
 
       // Set the pointer to the schedulable tasks.
       pendingTaskCollectionPointer.setToOverwrite(tasksToSchedule);
@@ -289,6 +293,10 @@ public final class BatchSingleJobScheduler implements Scheduler {
   }
 
   private Optional<List<Stage>> selectEarliestSchedulableGroup() {
+    if (sortedScheduleGroups == null) {
+      return Optional.empty();
+    }
+
     return sortedScheduleGroups.stream()
         .filter(scheduleGroup -> scheduleGroup.stream()
             .map(Stage::getId)
@@ -431,10 +439,20 @@ public final class BatchSingleJobScheduler implements Scheduler {
       default:
         throw new UnknownFailureCauseException(new Throwable("Unknown cause: " + failureCause));
     }
+
+    retryTasksAndRequiredParents(Collections.singleton(taskId));
   }
 
   ////////////////////////////////////////////////////////////////////// Helper methods
 
+  private void retryTasksAndRequiredParents(final Set<String> tasks) {
+    final Set<String> requiredParents = recursivelyGetParentTasksForLostBlocks(tasks);
+    final Set<String> tasksToRetry = Sets.union(tasks, requiredParents);
+    LOG.info("Will be retried: {}", tasksToRetry);
+    tasksToRetry.forEach(
+        taskToReExecute -> jobStateManager.onTaskStateChanged(taskToReExecute, TaskState.State.SHOULD_RETRY));
+  }
+
   private Set<String> recursivelyGetParentTasksForLostBlocks(final Set<String> children) {
     if (children.isEmpty()) {
       return Collections.emptySet();
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index caf0d40..0b0524e 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -66,7 +66,8 @@ public final class SchedulerRunner {
                          final ExecutorRegistry executorRegistry) {
     this.jobStateManagers = new HashMap<>();
     this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
-    this.schedulerThread = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "SchedulerRunner"));
+    this.schedulerThread = Executors.newSingleThreadExecutor(runnable ->
+        new Thread(runnable, "SchedulerRunner thread"));
     this.isSchedulerRunning = false;
     this.isTerminated = false;
     this.executorRegistry = executorRegistry;
@@ -114,7 +115,6 @@ public final class SchedulerRunner {
         continue;
       }
 
-      LOG.debug("Trying to schedule {}...", task.getTaskId());
       executorRegistry.viewExecutors(executors -> {
         final MutableObject<Set<ExecutorRepresenter>> candidateExecutors = new MutableObject<>(executors);
         task.getExecutionProperties().forEachProperties(property -> {
@@ -132,6 +132,8 @@ public final class SchedulerRunner {
           // update metadata first
           jobStateManager.onTaskStateChanged(task.getTaskId(), TaskState.State.EXECUTING);
 
+          LOG.info("{} scheduled to {}", task.getTaskId(), selectedExecutor.getExecutorId());
+
           // send the task
           selectedExecutor.onTaskScheduled(task);
         } else {
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/ContainerManagerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/ContainerManagerTest.java
index 18dd6cf..441dd31 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/ContainerManagerTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/ContainerManagerTest.java
@@ -107,7 +107,7 @@ public final class ContainerManagerTest {
         containerManager.onContainerAllocated(
             executorId,
             createMockEvaluator(evaluatorId, descriptor),
-            mock(Configuration.class));
+            createMockConfiguration());
         final ExecutorRepresenter executorRepresenter =
             containerManager.onContainerLaunched(createMockContext(executorId, descriptor)).get();
         assertEquals(spec.getContainerType(), executorRepresenter.getContainerType());
@@ -125,7 +125,7 @@ public final class ContainerManagerTest {
     containerManager.onContainerAllocated(
         getExecutorId(),
         createMockEvaluator(evaluatorId, createDescriptor(RESOURCE_SPEC_A)),
-        mock(Configuration.class));
+        createMockConfiguration());
     assertEquals(RESOURCE_SPEC_A, containerManager.onContainerFailed(evaluatorId));
   }
 
@@ -139,7 +139,7 @@ public final class ContainerManagerTest {
     containerManager.onContainerAllocated(
         executorId,
         createMockEvaluator(evaluatorId, descriptor),
-        mock(Configuration.class));
+        createMockConfiguration());
     containerManager.onContainerLaunched(createMockContext(executorId, descriptor));
     assertEquals(RESOURCE_SPEC_A, containerManager.onContainerFailed(evaluatorId));
   }
@@ -170,4 +170,8 @@ public final class ContainerManagerTest {
     when(mockedContext.getEvaluatorDescriptor()).thenReturn(descriptor);
     return mockedContext;
   }
+
+  private Configuration createMockConfiguration() {
+    return Tang.Factory.getTang().newConfigurationBuilder().build();
+  }
 }
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRestartTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
similarity index 99%
rename from runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRestartTest.java
rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
index 7064a23..4b6af1a 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRestartTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -59,10 +59,10 @@ import static org.mockito.Mockito.mock;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({BlockManagerMaster.class, SchedulerRunner.class, SchedulingConstraintRegistry.class,
     PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, MetricMessageHandler.class})
-public final class TaskRestartTest {
+public final class TaskRetryTest {
   @Rule public TestName testName = new TestName();
 
-  private static final Logger LOG = LoggerFactory.getLogger(TaskRestartTest.class.getName());
+  private static final Logger LOG = LoggerFactory.getLogger(TaskRetryTest.class.getName());
   private static final AtomicInteger ID_OFFSET = new AtomicInteger(1);
 
   private Random random;


Mime
View raw message