nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeongy...@apache.org
Subject [incubator-nemo] branch 717-TGE updated: PipeImpl to LocalPipe
Date Tue, 06 Mar 2018 06:22:17 GMT
This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch 717-TGE
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/717-TGE by this push:
     new 7bde45c  PipeImpl to LocalPipe
7bde45c is described below

commit 7bde45c7a477c3dd372c1af0aac88a9db4ce41e8
Author: Jeongyoon Eo <jeongyoon0807@gmail.com>
AuthorDate: Tue Mar 6 15:21:04 2018 +0900

    PipeImpl to LocalPipe
---
 .../nemo/runtime/executor/TaskGroupExecutor.java   | 26 +++++++++++-----------
 .../datatransfer/{PipeImpl.java => LocalPipe.java} | 18 +++++++--------
 2 files changed, 22 insertions(+), 22 deletions(-)

diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
index 9da4c23..c5bf3dc 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
@@ -56,8 +56,8 @@ public final class TaskGroupExecutor {
   private final MetricCollector metricCollector;
 
   // Map of task ID to its intra-TaskGroup data pipe.
-  private final Map<Task, List<PipeImpl>> taskToInputPipesMap;
-  private final Map<Task, PipeImpl> taskToOutputPipeMap;  // one and only one Pipe
per task
+  private final Map<Task, List<LocalPipe>> taskToInputPipesMap;
+  private final Map<Task, LocalPipe> taskToOutputPipeMap;  // one and only one Pipe
per task
   // Readers/writers that deals with inter-TaskGroup data.
   private final List<InputReader> inputReaders;
   private final Map<Task, List<InputReader>> taskToInputReadersMap;
@@ -208,13 +208,13 @@ public final class TaskGroupExecutor {
    * @param task the Task to add input pipes to.
    */
   private void addInputPipe(final Task task) {
-    List<PipeImpl> inputPipes = new ArrayList<>();
+    List<LocalPipe> inputPipes = new ArrayList<>();
     List<Task> parentTasks = taskGroupDag.getParents(task.getId());
     final String physicalTaskId = getPhysicalTaskId(task.getId());
 
     if (parentTasks != null) {
       parentTasks.forEach(parent -> {
-        final PipeImpl parentOutputPipe = taskToOutputPipeMap.get(parent);
+        final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent);
         inputPipes.add(parentOutputPipe);
         LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}",
             getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId);
@@ -232,7 +232,7 @@ public final class TaskGroupExecutor {
    * @param task the Task to add output pipes to.
    */
   private void addOutputPipe(final Task task) {
-    final PipeImpl outputPipe = new PipeImpl();
+    final LocalPipe outputPipe = new LocalPipe();
     final String physicalTaskId = getPhysicalTaskId(task.getId());
     final List<RuntimeEdge<Task>> outEdges = taskGroupDag.getOutgoingEdgesOf(task);
 
@@ -359,7 +359,7 @@ public final class TaskGroupExecutor {
     srcIteratorIdToTasksMap.values().forEach(tasks ->
         tasks.forEach(task -> {
           final List<Task> dstTasks = taskGroupDag.getChildren(task.getId());
-          PipeImpl pipe = taskToOutputPipeMap.get(task);
+          LocalPipe pipe = taskToOutputPipeMap.get(task);
           pipeIdToDstTasksMap.putIfAbsent(pipe.getId(), dstTasks);
           LOG.info("{} pipeIdToDstTasksMap: [{}'s OutputPipe, {}]",
               taskGroupId, getPhysicalTaskId(task.getId()), dstTasks);
@@ -367,7 +367,7 @@ public final class TaskGroupExecutor {
     iteratorIdToTasksMap.values().forEach(tasks ->
         tasks.forEach(task -> {
           final List<Task> dstTasks = taskGroupDag.getChildren(task.getId());
-          PipeImpl pipe = taskToOutputPipeMap.get(task);
+          LocalPipe pipe = taskToOutputPipeMap.get(task);
           pipeIdToDstTasksMap.putIfAbsent(pipe.getId(), dstTasks);
           LOG.info("{} pipeIdToDstTasksMap: [{}'s OutputPipe, {}]",
               taskGroupId, getPhysicalTaskId(task.getId()), dstTasks);
@@ -381,7 +381,7 @@ public final class TaskGroupExecutor {
     currentMap.values().forEach(tasks ->
         tasks.forEach(task -> {
           final List<Task> dstTasks = taskGroupDag.getChildren(task.getId());
-          PipeImpl pipe = taskToOutputPipeMap.get(task);
+          LocalPipe pipe = taskToOutputPipeMap.get(task);
           updatedMap.putIfAbsent(pipe.getId(), dstTasks);
           LOG.info("{} pipeIdToDstTasksMap: [{}, {}]",
               taskGroupId, getPhysicalTaskId(task.getId()), dstTasks);
@@ -466,7 +466,7 @@ public final class TaskGroupExecutor {
       }
 
       final Transform.Context transformContext = new ContextImpl(sideInputMap);
-      final PipeImpl outputPipe = taskToOutputPipeMap.get(task);
+      final LocalPipe outputPipe = taskToOutputPipeMap.get(task);
       transform.prepare(transformContext, outputPipe);
 
       preparedTransforms.add(transform);
@@ -555,7 +555,7 @@ public final class TaskGroupExecutor {
       initializePipeToDstTasksMap();
       while (!finishedAllTasks()) {
         pipeIdToDstTasksMap.forEach((pipeId, dstTasks) -> {
-          PipeImpl pipe = taskToOutputPipeMap.values().stream()
+          LocalPipe pipe = taskToOutputPipeMap.values().stream()
               .filter(p -> p.getId() == pipeId)
               .findFirst().get();
 
@@ -636,7 +636,7 @@ public final class TaskGroupExecutor {
 
     // Process element-wise depending on the Task type
     if (task instanceof BoundedSourceTask) {
-      PipeImpl pipe = taskToOutputPipeMap.get(task);
+      LocalPipe pipe = taskToOutputPipeMap.get(task);
 
       if (data.contains(null)) {  // data is [null] used for VoidCoders
         pipe.emit(data);
@@ -662,7 +662,7 @@ public final class TaskGroupExecutor {
         transform.onData(dataElement);
       });
     } else if (task instanceof MetricCollectionBarrierTask) {
-      PipeImpl pipe = taskToOutputPipeMap.get(task);
+      LocalPipe pipe = taskToOutputPipeMap.get(task);
 
       if (data.contains(null)) {  // data is [null] used for VoidCoders
         pipe.emit(data);
@@ -677,7 +677,7 @@ public final class TaskGroupExecutor {
     }
 
     // For the produced output
-    PipeImpl pipe = taskToOutputPipeMap.get(task);
+    LocalPipe pipe = taskToOutputPipeMap.get(task);
     while (!pipe.isEmpty()) {
       final Object element = pipe.remove();
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/PipeImpl.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/LocalPipe.java
similarity index 80%
rename from runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/PipeImpl.java
rename to runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/LocalPipe.java
index fbcc662..05ec60e 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/PipeImpl.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/LocalPipe.java
@@ -26,14 +26,14 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * Pipe implementation that requires synchronization.
+ * Local pipe implementation.
  *
  * @param <O> output type.
  */
-public final class PipeImpl<O> implements Pipe<O> {
-  private static final Logger LOG = LoggerFactory.getLogger(PipeImpl.class.getName());
-  private static final String PIPEID_PREFIX = "PIPE_";
-  private static final AtomicInteger PIPEID_GENERATOR = new AtomicInteger(0);
+public final class LocalPipe<O> implements Pipe<O> {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalPipe.class.getName());
+  private static final String LOCALPIPEID_PREFIX = "LOCALPIPE_";
+  private static final AtomicInteger LOCALPIPEID_GENERATOR = new AtomicInteger(0);
 
   private final String id;
   private final ArrayDeque<O> outputQueue;
@@ -43,8 +43,8 @@ public final class PipeImpl<O> implements Pipe<O> {
   /**
    * Constructor of a new Pipe.
    */
-  public PipeImpl() {
-    this.id = PIPEID_PREFIX + PIPEID_GENERATOR.getAndIncrement();
+  public LocalPipe() {
+    this.id = LOCALPIPEID_PREFIX + LOCALPIPEID_GENERATOR.getAndIncrement();
     this.outputQueue = new ArrayDeque<>();
     this.sideInputRuntimeEdge = null;
     this.sideInputReceivers = new ArrayList<>();
@@ -57,11 +57,11 @@ public final class PipeImpl<O> implements Pipe<O> {
 
   @Override
   public void emit(final String dstVertexId, final Object output) {
-    throw new UnsupportedOperationException("emit(dstVertexId, output) in PipeImpl.");
+    throw new UnsupportedOperationException("emit(dstVertexId, output) in LocalPipe.");
   }
 
   /**
-   * Inter-Task data is transferred from sender-side Task's PipeImpl to receiver-side Task.
+   * Inter-Task data is transferred from sender-side Task's LocalPipe to receiver-side Task.
    *
    * @return the first element of this list
    */

-- 
To stop receiving notification emails like this one, please contact
jeongyoon@apache.org.

Mime
View raw message