nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeongy...@apache.org
Subject [incubator-nemo] branch 717-TGE updated: address comments
Date Fri, 16 Mar 2018 06:40:47 GMT
This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch 717-TGE
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/717-TGE by this push:
     new aa0b7ce  address comments
aa0b7ce is described below

commit aa0b7cea6c15a2f58aed4738576ba4788139cf5d
Author: Jeongyoon Eo <jeongyoon0807@gmail.com>
AuthorDate: Fri Mar 16 15:40:33 2018 +0900

    address comments
---
 .../beam/transform/CreateViewTransform.java        |  4 ++--
 .../frontend/beam/transform/DoTransform.java       |  4 ++--
 .../frontend/beam/transform/FlattenTransform.java  |  4 ++--
 .../beam/transform/GroupByKeyTransform.java        |  4 ++--
 .../frontend/beam/transform/WindowTransform.java   |  4 ++--
 .../frontend/spark/transform/CollectTransform.java |  2 +-
 .../frontend/spark/transform/FlatMapTransform.java |  4 ++--
 .../spark/transform/GroupByKeyTransform.java       |  4 ++--
 .../spark/transform/MapToPairTransform.java        |  4 ++--
 .../frontend/spark/transform/MapTransform.java     |  4 ++--
 .../spark/transform/ReduceByKeyTransform.java      |  8 ++++----
 .../frontend/spark/transform/ReduceTransform.java  |  4 ++--
 .../nemo/runtime/executor/TaskGroupExecutor.java   |  7 ++-----
 .../executor/datatransfer/OutputWriter.java        |  5 +++++
 .../executor/datatransfer/TaskDataHandler.java     | 23 ++++++++++++++++++++++
 15 files changed, 55 insertions(+), 30 deletions(-)

diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
index 9f950ec..b342595 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
@@ -46,8 +46,8 @@ public final class CreateViewTransform<I, O> implements Transform<I,
O> {
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<O> p) {
-    this.outputCollector = p;
+  public void prepare(final Context context, final OutputCollector<O> oc) {
+    this.outputCollector = oc;
   }
 
   @Override
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java
index 9fdd1a3..e099253 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java
@@ -70,8 +70,8 @@ public final class DoTransform<I, O> implements Transform<I, O>
{
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<O> p) {
-    this.outputCollector = p;
+  public void prepare(final Context context, final OutputCollector<O> oc) {
+    this.outputCollector = oc;
     this.sideInputs = new HashMap<>();
     context.getSideInputs().forEach((k, v) -> this.sideInputs.put(((CreateViewTransform)
k).getTag(), v));
     this.startBundleContext = new StartBundleContext(doFn, serializedOptions);
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/FlattenTransform.java
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/FlattenTransform.java
index 43c3f2f..e58de53 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/FlattenTransform.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/FlattenTransform.java
@@ -36,8 +36,8 @@ public final class FlattenTransform<T> implements Transform<T,
T> {
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<T> p) {
-    this.outputCollector = p;
+  public void prepare(final Context context, final OutputCollector<T> oc) {
+    this.outputCollector = oc;
   }
 
   @Override
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
index 32126fe..6114729 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
@@ -40,8 +40,8 @@ public final class GroupByKeyTransform<I> implements Transform<I,
KV<Object, Lis
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<KV<Object, List>>
p) {
-    this.outputCollector = p;
+  public void prepare(final Context context, final OutputCollector<KV<Object, List>>
oc) {
+    this.outputCollector = oc;
   }
 
   @Override
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/WindowTransform.java
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/WindowTransform.java
index 13c0770..1b89b64 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/WindowTransform.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/WindowTransform.java
@@ -38,8 +38,8 @@ public final class WindowTransform<T> implements Transform<T, T>
{
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<T> p) {
-    this.outputCollector = p;
+  public void prepare(final Context context, final OutputCollector<T> oc) {
+    this.outputCollector = oc;
   }
 
   @Override
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
index 5f8efe5..252056f 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
@@ -45,7 +45,7 @@ public final class CollectTransform<T> implements Transform<T,
T> {
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<T> p) {
+  public void prepare(final Context context, final OutputCollector<T> oc) {
     this.filename = filename + JavaRDD.getResultId();
   }
 
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
index 7100623..406f5db 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
@@ -37,8 +37,8 @@ public final class FlatMapTransform<T, U> implements Transform<T,
U> {
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<U> p) {
-    this.outputCollector = p;
+  public void prepare(final Context context, final OutputCollector<U> oc) {
+    this.outputCollector = oc;
   }
 
   @Override
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java
index f2240fb..0703e81 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java
@@ -38,8 +38,8 @@ public final class GroupByKeyTransform<K, V> implements Transform<Tuple2<K,
V>,
   }
 
   @Override
-  public void prepare(final Transform.Context context, final OutputCollector<Tuple2<K,
Iterable<V>>> p) {
-    this.outputCollector = p;
+  public void prepare(final Transform.Context context, final OutputCollector<Tuple2<K,
Iterable<V>>> oc) {
+    this.outputCollector = oc;
   }
 
   @Override
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
index 9359c42..878ffe5 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
@@ -39,8 +39,8 @@ public final class MapToPairTransform<T, K, V> implements Transform<T,
Tuple2<K,
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<Tuple2<K, V>>
p) {
-    this.outputCollector = p;
+  public void prepare(final Context context, final OutputCollector<Tuple2<K, V>>
oc) {
+    this.outputCollector = oc;
   }
 
   @Override
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/MapTransform.java
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/MapTransform.java
index c5fa026..976eb50 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/MapTransform.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/MapTransform.java
@@ -37,8 +37,8 @@ public final class MapTransform<I, O> implements Transform<I, O>
{
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<O> p) {
-    this.outputCollector = p;
+  public void prepare(final Context context, final OutputCollector<O> oc) {
+    this.outputCollector = oc;
   }
 
   public void onData(final I element) {
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
index ef5b3a8..176ff77 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
@@ -46,14 +46,14 @@ public final class ReduceByKeyTransform<K, V> implements Transform<Tuple2<K,
V>,
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<Tuple2<K, V>>
p) {
-    this.outputCollector = p;
+  public void prepare(final Context context, final OutputCollector<Tuple2<K, V>>
oc) {
+    this.outputCollector = oc;
   }
 
   @Override
   public void onData(final Tuple2<K, V> element) {
-    K key = element._1;
-    V value = element._2;
+    final K key = element._1;
+    final V value = element._2;
 
     keyToValues.putIfAbsent(key, new ArrayList<>());
     keyToValues.get(key).add(value);
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java
index 7f2ae0e..e22d125 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java
@@ -49,8 +49,8 @@ public final class ReduceTransform<T> implements Transform<T, T>
{
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<T> p) {
-    this.outputCollector = p;
+  public void prepare(final Context context, final OutputCollector<T> oc) {
+    this.outputCollector = oc;
   }
 
   @Override
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
index 7aaf8bf..7595b43 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
@@ -34,11 +34,9 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Executes a task group.
  */
@@ -313,7 +311,7 @@ public final class TaskGroupExecutor {
   }
 
   private void prepareInputFromOtherStages() {
-    inputReaders.stream().forEach(inputReader -> {
+    inputReaderToDataHandlersMap.forEach((inputReader, dataHandlers) -> {
       final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures =
inputReader.read();
       numPartitions += futures.size();
 
@@ -327,8 +325,7 @@ public final class TaskGroupExecutor {
         if (iteratorIdToDataHandlersMap.containsKey(iteratorId)) {
           throw new RuntimeException("iteratorIdToDataHandlersMap already contains " + iteratorId);
         } else {
-          iteratorIdToDataHandlersMap.computeIfAbsent(iteratorId,
-              absentIteratorId -> inputReaderToDataHandlersMap.get(inputReader));
+          iteratorIdToDataHandlersMap.computeIfAbsent(iteratorId, absentIteratorId ->
dataHandlers);
           try {
             partitionQueue.put(Pair.of(iteratorId, iterator));
           } catch (InterruptedException e) {
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index 2f647aa..f3d38ad 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -68,6 +68,11 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable
{
     blockManagerWorker.createBlock(blockId, blockStoreValue);
   }
 
+  /**
+   * Collects output element-wise in memory.
+   *
+   * * @param element
+   */
   public void writeElement(final Object element) {
     outputList.add(element);
   }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
index 2265f69..2e5fdae 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
@@ -24,6 +24,9 @@ import java.util.List;
 
 /**
  * Per-Task data handler.
+ * This is a wrapper class that handles data transfer of a Task.
+ * As TaskGroup input is processed element-wise, Task output element percolates down
+ * through the DAG of children TaskDataHandlers.
  */
 public final class TaskDataHandler {
   private static final Logger LOG = LoggerFactory.getLogger(TaskDataHandler.class.getName());
@@ -54,18 +57,38 @@ public final class TaskDataHandler {
     return children;
   }
 
+  /**
+   * Get intra-TaskGroup input from parent tasks.
+   * We keep parent tasks' OutputCollectors, as they're the place where parent task output
+   * becomes available element-wise.
+   * @return OutputCollectors of all parent tasks.
+   */
   public List<OutputCollectorImpl> getInputFromThisStage() {
     return inputFromThisStage;
   }
 
+  /**
+   * Get side input from other TaskGroup.
+   * @return InputReader that has side input.
+   */
   public List<InputReader> getSideInputFromOtherStages() {
     return sideInputFromOtherStages;
   }
 
+  /**
+   * Get intra-TaskGroup side input from parent tasks.
+   * Just like normal intra-TaskGroup inputs, intra-TaskGroup side inputs are
+   * collected in parent tasks' OutputCollectors.
+   * @return OutputCollectors of all parent tasks which are marked as having side input.
+   */
   public List<OutputCollectorImpl> getSideInputFromThisStage() {
     return sideInputFromThisStage;
   }
 
+  /**
+   * Get OutputCollector
+   * @return
+   */
   public OutputCollectorImpl getOutputCollector() {
     return outputCollector;
   }

-- 
To stop receiving notification emails like this one, please contact
jeongyoon@apache.org.

Mime
View raw message