nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeongy...@apache.org
Subject [incubator-nemo] branch 717-TGE updated: address comments
Date Thu, 10 May 2018 05:27:18 GMT
This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch 717-TGE
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/717-TGE by this push:
     new 5e7f808  address comments
5e7f808 is described below

commit 5e7f808a01abcfe9737c7a3f111e6d57825ee135
Author: Jeongyoon Eo <jeongyoon0807@gmail.com>
AuthorDate: Thu May 10 14:27:03 2018 +0900

    address comments
---
 .../frontend/spark/transform/CollectTransform.java |  9 ++-
 .../optimizer/examples/EmptyComponents.java        |  2 +-
 .../beam/policy/DataSkewPolicyParallelismFive.java |  2 +-
 .../nemo/runtime/executor/TaskGroupExecutor.java   | 11 ++--
 .../executor/datatransfer/OutputCollectorImpl.java | 20 +++++++
 .../executor/datatransfer/OutputWriter.java        |  2 +-
 .../executor/datatransfer/TaskDataHandler.java     | 67 ++++++++++++++--------
 7 files changed, 74 insertions(+), 39 deletions(-)

diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
index 252056f..702e3bd 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
@@ -30,8 +30,6 @@ import java.util.List;
  */
 public final class CollectTransform<T> implements Transform<T, T> {
   private String filename;
-  private FileOutputStream fos;
-  private ObjectOutputStream oos;
   private final List<T> list;
 
   /**
@@ -58,9 +56,10 @@ public final class CollectTransform<T> implements Transform<T,
T> {
 
   @Override
   public void close() {
-    try {
-      fos = new FileOutputStream(filename);
-      oos = new ObjectOutputStream(fos);
+    try (
+        FileOutputStream fos = new FileOutputStream(filename);
+        ObjectOutputStream oos = new ObjectOutputStream(fos)
+    ) {
       oos.writeObject(list);
       oos.close();
     } catch (Exception e) {
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
index f82241e..257b757 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
@@ -59,7 +59,7 @@ public class EmptyComponents {
     }
 
     @Override
-    public void onData(final Object element) {
+    public void onData(final I element) {
     }
 
     @Override
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/DataSkewPolicyParallelismFive.java
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/DataSkewPolicyParallelismFive.java
index 731df1f..80de5fa 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/DataSkewPolicyParallelismFive.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/DataSkewPolicyParallelismFive.java
@@ -25,7 +25,7 @@ import java.util.List;
 /**
  * A data-skew policy with fixed parallelism 5 for tests.
  */
-public final class DataSkewPolicyParallelismFive implements Policy{
+public final class DataSkewPolicyParallelismFive implements Policy {
   private final Policy policy;
 
   public DataSkewPolicyParallelismFive() {
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
index 2e974f7..546f69b 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
@@ -168,7 +168,7 @@ public final class TaskGroupExecutor {
       outEdgesToOtherStages.forEach(physicalStageEdge -> {
         final OutputWriter outputWriter = channelFactory.createWriter(
             task, taskGroupIdx, physicalStageEdge.getDstVertex(), physicalStageEdge);
-        dataHandler.addOutputWriters(outputWriter);
+        dataHandler.addOutputWriter(outputWriter);
       });
 
       // Add InputPipes for intra-stage data transfer
@@ -331,16 +331,15 @@ public final class TaskGroupExecutor {
         } catch (final BlockFetchException ex) {
           taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_RECOVERABLE,
               Optional.empty(), Optional.of(TaskGroupState.RecoverableFailureCause.INPUT_READ_FAILURE));
-          LOG.info("{} Execution Failed (Recoverable: input read failure)! Exception: {}",
+          LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}",
               taskGroupId, ex.toString());
         } catch (final Exception e) {
           taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_UNRECOVERABLE,
               Optional.empty(), Optional.empty());
-          LOG.info("{} Execution Failed! Exception: {}", taskGroupId, e.toString());
+          LOG.error("{} Execution Failed! Exception: {}", taskGroupId, e.toString());
           throw new RuntimeException(e);
         }
       }
-      // TODO #XXX: Support other types of source tasks, i. e. InitializedSourceTask
     });
   }
 
@@ -613,12 +612,12 @@ public final class TaskGroupExecutor {
     } catch (final BlockWriteException ex2) {
       taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_RECOVERABLE,
           Optional.empty(), Optional.of(TaskGroupState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
-      LOG.info("{} Execution Failed (Recoverable: output write failure)! Exception: {}",
+      LOG.error("{} Execution Failed (Recoverable: output write failure)! Exception: {}",
           taskGroupId, ex2.toString());
     } catch (final Exception e) {
       taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_UNRECOVERABLE,
           Optional.empty(), Optional.empty());
-      LOG.info("{} Execution Failed! Exception: {}",
+      LOG.error("{} Execution Failed! Exception: {}",
           taskGroupId, e.toString());
       throw new RuntimeException(e);
     }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
index 5720d2a..62cbc22 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
@@ -61,10 +61,20 @@ public final class OutputCollectorImpl<O> implements OutputCollector<O>
{
     return outputQueue.remove();
   }
 
+  /**
+   * Check if this OutputCollector is empty.
+   *
+   * @return true if this OutputCollector is empty.
+   */
   public boolean isEmpty() {
     return outputQueue.isEmpty();
   }
 
+  /**
+   * Return the size of this OutputCollector.
+   *
+   * @return the total number of elements in this OutputCollector.
+   */
   public int size() {
     return outputQueue.size();
   }
@@ -79,6 +89,11 @@ public final class OutputCollectorImpl<O> implements OutputCollector<O>
{
     sideInputRuntimeEdge = edge;
   }
 
+  /**
+   * Get the RuntimeEdge marked as side input.
+   *
+   * @return the RuntimeEdge marked as side input.
+   */
   public RuntimeEdge getSideInputRuntimeEdge() {
     return sideInputRuntimeEdge;
   }
@@ -92,6 +107,11 @@ public final class OutputCollectorImpl<O> implements OutputCollector<O>
{
     sideInputReceivers.add(physicalTaskId);
   }
 
+  /**
+   * Check if this OutputCollector has side input for the given child task.
+   *
+   * @return true if it contains side input for child task of the given id.
+   */
   public boolean hasSideInputFor(final String physicalTaskId) {
     return sideInputReceivers.contains(physicalTaskId);
   }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index f3d38ad..86043e4 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -71,7 +71,7 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable
{
   /**
    * Collects output element-wise in memory.
    *
-   * * @param element
+   * @param element
    */
   public void writeElement(final Object element) {
     outputList.add(element);
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
index 255fa9e..523d837 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
@@ -16,8 +16,6 @@
 package edu.snu.nemo.runtime.executor.datatransfer;
 
 import edu.snu.nemo.runtime.common.plan.physical.Task;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -29,8 +27,19 @@ import java.util.List;
  * through the DAG of children TaskDataHandlers.
  */
 public final class TaskDataHandler {
-  private static final Logger LOG = LoggerFactory.getLogger(TaskDataHandler.class.getName());
+  private final Task task;
+  private List<TaskDataHandler> children;
+  private final List<OutputCollectorImpl> inputFromThisStage;
+  private final List<InputReader> sideInputFromOtherStages;
+  private final List<OutputCollectorImpl> sideInputFromThisStage;
+  private OutputCollectorImpl outputCollector;
+  private final List<OutputWriter> outputWriters;
 
+  /**
+   * TaskDataHandler Constructor.
+   *
+   * @param task Task of this TaskDataHandler.
+   */
   public TaskDataHandler(final Task task) {
     this.task = task;
     this.children = new ArrayList<>();
@@ -41,34 +50,27 @@ public final class TaskDataHandler {
     this.outputWriters = new ArrayList<>();
   }
 
-  private final Task task;
-  private List<TaskDataHandler> children;
-  private final List<OutputCollectorImpl> inputFromThisStage;
-  private final List<InputReader> sideInputFromOtherStages;
-  private final List<OutputCollectorImpl> sideInputFromThisStage;
-  private OutputCollectorImpl outputCollector;
-  private final List<OutputWriter> outputWriters;
-
+  /**
+   * Get the task that owns this TaskDataHandler.
+   *
+   * @return task of this TaskDataHandler.
+   */
   public Task getTask() {
     return task;
   }
 
-  public List<TaskDataHandler> getChildren() {
-    return children;
-  }
-
   /**
-   * Get intra-TaskGroup input from parent tasks.
-   * We keep parent tasks' OutputCollectors, as they're the place where parent task output
-   * becomes available element-wise.
-   * @return OutputCollectors of all parent tasks.
+   * Get a DAG of children tasks' TaskDataHandlers.
+   *
+   * @return DAG of children tasks' TaskDataHandlers.
    */
-  public List<OutputCollectorImpl> getInputFromThisStage() {
-    return inputFromThisStage;
+  public List<TaskDataHandler> getChildren() {
+    return children;
   }
 
   /**
    * Get side input from other TaskGroup.
+   *
    * @return InputReader that has side input.
    */
   public List<InputReader> getSideInputFromOtherStages() {
@@ -79,6 +81,7 @@ public final class TaskDataHandler {
    * Get intra-TaskGroup side input from parent tasks.
    * Just like normal intra-TaskGroup inputs, intra-TaskGroup side inputs are
    * collected in parent tasks' OutputCollectors.
+   *
    * @return OutputCollectors of all parent tasks which are marked as having side input.
    */
   public List<OutputCollectorImpl> getSideInputFromThisStage() {
@@ -87,6 +90,7 @@ public final class TaskDataHandler {
 
   /**
    * Get OutputCollector of this task.
+   *
    * @return OutputCollector of this task.
    */
   public OutputCollectorImpl getOutputCollector() {
@@ -95,6 +99,7 @@ public final class TaskDataHandler {
 
   /**
    * Get OutputWriters of this task.
+   *
    * @return OutputWriters of this task.
    */
   public List<OutputWriter> getOutputWriters() {
@@ -103,27 +108,35 @@ public final class TaskDataHandler {
 
   /**
    * Set a DAG of children tasks' DataHandlers.
+   *
+   * @param childrenDataHandler list of children TaskDataHandlers.
    */
   public void setChildrenDataHandler(final List<TaskDataHandler> childrenDataHandler)
{
     children = childrenDataHandler;
   }
 
   /**
-   * Add OutputCollector of a parent task.
+   * Add OutputCollector of a parent task that will provide intra-stage input.
+   *
+   * @param input OutputCollector of a parent task.
    */
   public void addInputFromThisStages(final OutputCollectorImpl input) {
     inputFromThisStage.add(input);
   }
 
   /**
-   * Add InputReader that contains side input from other TaskGroup.
+   * Add InputReader that will provide inter-stage side input.
+   *
+   * @param sideInputReader InputReader that will provide inter-stage side input.
    */
   public void addSideInputFromOtherStages(final InputReader sideInputReader) {
     sideInputFromOtherStages.add(sideInputReader);
   }
 
   /**
-   * Add parent OutputCollector that contains side input from the parent task.
+   * Add OutputCollector of a parent task that will provide intra-stage side input.
+   *
+   * @param ocAsSideInput OutputCollector of a parent task with side input.
    */
   public void addSideInputFromThisStage(final OutputCollectorImpl ocAsSideInput) {
     sideInputFromThisStage.add(ocAsSideInput);
@@ -131,6 +144,8 @@ public final class TaskDataHandler {
 
   /**
    * Set OutputCollector of this task.
+   *
+   * @param oc OutputCollector of this task.
    */
   public void setOutputCollector(final OutputCollectorImpl oc) {
     outputCollector = oc;
@@ -138,8 +153,10 @@ public final class TaskDataHandler {
 
   /**
    * Add OutputWriter of this task.
+   *
+   * @param outputWriter OutputWriter of this task.
    */
-  public void addOutputWriters(final OutputWriter outputWriter) {
+  public void addOutputWriter(final OutputWriter outputWriter) {
     outputWriters.add(outputWriter);
   }
 }

-- 
To stop receiving notification emails like this one, please contact
jeongyoon@apache.org.

Mime
View raw message