From commits-return-80-archive-asf-public=cust-asf.ponee.io@nemo.apache.org Tue Mar 6 07:22:20 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 86251180652 for ; Tue, 6 Mar 2018 07:22:19 +0100 (CET) Received: (qmail 61741 invoked by uid 500); 6 Mar 2018 06:22:18 -0000 Mailing-List: contact commits-help@nemo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nemo.apache.org Delivered-To: mailing list commits@nemo.apache.org Received: (qmail 61732 invoked by uid 99); 6 Mar 2018 06:22:18 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Mar 2018 06:22:18 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id D54CA82667; Tue, 6 Mar 2018 06:22:17 +0000 (UTC) Date: Tue, 06 Mar 2018 06:22:17 +0000 To: "commits@nemo.apache.org" Subject: [incubator-nemo] branch 717-TGE updated: PipeImpl to LocalPipe MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152031733782.29566.15946103699435897998@gitbox.apache.org> From: jeongyoon@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-nemo X-Git-Refname: refs/heads/717-TGE X-Git-Reftype: branch X-Git-Oldrev: 7d261e92cf2d06a0a11faeff3f6e82bf75822060 X-Git-Newrev: 7bde45c7a477c3dd372c1af0aac88a9db4ce41e8 X-Git-Rev: 7bde45c7a477c3dd372c1af0aac88a9db4ce41e8 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. jeongyoon pushed a commit to branch 717-TGE in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git The following commit(s) were added to refs/heads/717-TGE by this push: new 7bde45c PipeImpl to LocalPipe 7bde45c is described below commit 7bde45c7a477c3dd372c1af0aac88a9db4ce41e8 Author: Jeongyoon Eo AuthorDate: Tue Mar 6 15:21:04 2018 +0900 PipeImpl to LocalPipe --- .../nemo/runtime/executor/TaskGroupExecutor.java | 26 +++++++++++----------- .../datatransfer/{PipeImpl.java => LocalPipe.java} | 18 +++++++-------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java index 9da4c23..c5bf3dc 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java @@ -56,8 +56,8 @@ public final class TaskGroupExecutor { private final MetricCollector metricCollector; // Map of task ID to its intra-TaskGroup data pipe. - private final Map> taskToInputPipesMap; - private final Map taskToOutputPipeMap; // one and only one Pipe per task + private final Map> taskToInputPipesMap; + private final Map taskToOutputPipeMap; // one and only one Pipe per task // Readers/writers that deals with inter-TaskGroup data. private final List inputReaders; private final Map> taskToInputReadersMap; @@ -208,13 +208,13 @@ public final class TaskGroupExecutor { * @param task the Task to add input pipes to. */ private void addInputPipe(final Task task) { - List inputPipes = new ArrayList<>(); + List inputPipes = new ArrayList<>(); List parentTasks = taskGroupDag.getParents(task.getId()); final String physicalTaskId = getPhysicalTaskId(task.getId()); if (parentTasks != null) { parentTasks.forEach(parent -> { - final PipeImpl parentOutputPipe = taskToOutputPipeMap.get(parent); + final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent); inputPipes.add(parentOutputPipe); LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}", getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId); @@ -232,7 +232,7 @@ public final class TaskGroupExecutor { * @param task the Task to add output pipes to. */ private void addOutputPipe(final Task task) { - final PipeImpl outputPipe = new PipeImpl(); + final LocalPipe outputPipe = new LocalPipe(); final String physicalTaskId = getPhysicalTaskId(task.getId()); final List> outEdges = taskGroupDag.getOutgoingEdgesOf(task); @@ -359,7 +359,7 @@ public final class TaskGroupExecutor { srcIteratorIdToTasksMap.values().forEach(tasks -> tasks.forEach(task -> { final List dstTasks = taskGroupDag.getChildren(task.getId()); - PipeImpl pipe = taskToOutputPipeMap.get(task); + LocalPipe pipe = taskToOutputPipeMap.get(task); pipeIdToDstTasksMap.putIfAbsent(pipe.getId(), dstTasks); LOG.info("{} pipeIdToDstTasksMap: [{}'s OutputPipe, {}]", taskGroupId, getPhysicalTaskId(task.getId()), dstTasks); @@ -367,7 +367,7 @@ public final class TaskGroupExecutor { iteratorIdToTasksMap.values().forEach(tasks -> tasks.forEach(task -> { final List dstTasks = taskGroupDag.getChildren(task.getId()); - PipeImpl pipe = taskToOutputPipeMap.get(task); + LocalPipe pipe = taskToOutputPipeMap.get(task); pipeIdToDstTasksMap.putIfAbsent(pipe.getId(), dstTasks); LOG.info("{} pipeIdToDstTasksMap: [{}'s OutputPipe, {}]", taskGroupId, getPhysicalTaskId(task.getId()), dstTasks); @@ -381,7 +381,7 @@ public final class TaskGroupExecutor { currentMap.values().forEach(tasks -> tasks.forEach(task -> { final List dstTasks = taskGroupDag.getChildren(task.getId()); - PipeImpl pipe = taskToOutputPipeMap.get(task); + LocalPipe pipe = taskToOutputPipeMap.get(task); updatedMap.putIfAbsent(pipe.getId(), dstTasks); LOG.info("{} pipeIdToDstTasksMap: [{}, {}]", taskGroupId, getPhysicalTaskId(task.getId()), dstTasks); @@ -466,7 +466,7 @@ public final class TaskGroupExecutor { } final Transform.Context transformContext = new ContextImpl(sideInputMap); - final PipeImpl outputPipe = taskToOutputPipeMap.get(task); + final LocalPipe outputPipe = taskToOutputPipeMap.get(task); transform.prepare(transformContext, outputPipe); preparedTransforms.add(transform); @@ -555,7 +555,7 @@ public final class TaskGroupExecutor { initializePipeToDstTasksMap(); while (!finishedAllTasks()) { pipeIdToDstTasksMap.forEach((pipeId, dstTasks) -> { - PipeImpl pipe = taskToOutputPipeMap.values().stream() + LocalPipe pipe = taskToOutputPipeMap.values().stream() .filter(p -> p.getId() == pipeId) .findFirst().get(); @@ -636,7 +636,7 @@ public final class TaskGroupExecutor { // Process element-wise depending on the Task type if (task instanceof BoundedSourceTask) { - PipeImpl pipe = taskToOutputPipeMap.get(task); + LocalPipe pipe = taskToOutputPipeMap.get(task); if (data.contains(null)) { // data is [null] used for VoidCoders pipe.emit(data); @@ -662,7 +662,7 @@ public final class TaskGroupExecutor { transform.onData(dataElement); }); } else if (task instanceof MetricCollectionBarrierTask) { - PipeImpl pipe = taskToOutputPipeMap.get(task); + LocalPipe pipe = taskToOutputPipeMap.get(task); if (data.contains(null)) { // data is [null] used for VoidCoders pipe.emit(data); @@ -677,7 +677,7 @@ public final class TaskGroupExecutor { } // For the produced output - PipeImpl pipe = taskToOutputPipeMap.get(task); + LocalPipe pipe = taskToOutputPipeMap.get(task); while (!pipe.isEmpty()) { final Object element = pipe.remove(); diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/PipeImpl.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/LocalPipe.java similarity index 80% rename from runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/PipeImpl.java rename to runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/LocalPipe.java index fbcc662..05ec60e 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/PipeImpl.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/LocalPipe.java @@ -26,14 +26,14 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; /** - * Pipe implementation that requires synchronization. + * Local pipe implementation. * * @param output type. */ -public final class PipeImpl implements Pipe { - private static final Logger LOG = LoggerFactory.getLogger(PipeImpl.class.getName()); - private static final String PIPEID_PREFIX = "PIPE_"; - private static final AtomicInteger PIPEID_GENERATOR = new AtomicInteger(0); +public final class LocalPipe implements Pipe { + private static final Logger LOG = LoggerFactory.getLogger(LocalPipe.class.getName()); + private static final String LOCALPIPEID_PREFIX = "LOCALPIPE_"; + private static final AtomicInteger LOCALPIPEID_GENERATOR = new AtomicInteger(0); private final String id; private final ArrayDeque outputQueue; @@ -43,8 +43,8 @@ public final class PipeImpl implements Pipe { /** * Constructor of a new Pipe. */ - public PipeImpl() { - this.id = PIPEID_PREFIX + PIPEID_GENERATOR.getAndIncrement(); + public LocalPipe() { + this.id = LOCALPIPEID_PREFIX + LOCALPIPEID_GENERATOR.getAndIncrement(); this.outputQueue = new ArrayDeque<>(); this.sideInputRuntimeEdge = null; this.sideInputReceivers = new ArrayList<>(); @@ -57,11 +57,11 @@ public final class PipeImpl implements Pipe { @Override public void emit(final String dstVertexId, final Object output) { - throw new UnsupportedOperationException("emit(dstVertexId, output) in PipeImpl."); + throw new UnsupportedOperationException("emit(dstVertexId, output) in LocalPipe."); } /** - * Inter-Task data is transferred from sender-side Task's PipeImpl to receiver-side Task. + * Inter-Task data is transferred from sender-side Task's LocalPipe to receiver-side Task. * * @return the first element of this list */ -- To stop receiving notification emails like this one, please contact jeongyoon@apache.org.