Return-Path: X-Original-To: apmail-apex-commits-archive@minotaur.apache.org Delivered-To: apmail-apex-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ED8FC18ED5 for ; Sat, 23 Jan 2016 03:53:13 +0000 (UTC) Received: (qmail 6675 invoked by uid 500); 23 Jan 2016 03:53:13 -0000 Delivered-To: apmail-apex-commits-archive@apex.apache.org Received: (qmail 6626 invoked by uid 500); 23 Jan 2016 03:53:13 -0000 Mailing-List: contact commits-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list commits@apex.incubator.apache.org Received: (qmail 6615 invoked by uid 99); 23 Jan 2016 03:53:13 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 23 Jan 2016 03:53:13 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 252761804B7 for ; Sat, 23 Jan 2016 03:53:13 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.247 X-Spam-Level: * X-Spam-Status: No, score=1.247 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.554, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id UiaUuwYFmwGW for ; Sat, 23 Jan 2016 03:53:04 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id C07C823016 for ; Sat, 23 Jan 2016 03:53:03 +0000 (UTC) Received: (qmail 6484 invoked by uid 99); 23 Jan 2016 03:53:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 23 Jan 2016 03:53:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 76D86DFDE0; Sat, 23 Jan 2016 03:53:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: thw@apache.org To: commits@apex.incubator.apache.org Date: Sat, 23 Jan 2016 03:53:03 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] incubator-apex-core git commit: moved attribute from context to logical plan moved attribute from context to logical plan Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/4d5828c6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/4d5828c6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/4d5828c6 Branch: refs/heads/devel-3 Commit: 4d5828c6ca48f5d28cd8c77c5706c6f72c7cd1ad Parents: f7e1ccf Author: Gaurav Authored: Wed Dec 16 06:33:54 2015 -0800 Committer: David Yan Committed: Fri Jan 22 19:04:27 2016 -0800 ---------------------------------------------------------------------- api/src/main/java/com/datatorrent/api/Context.java | 7 ------- .../main/java/com/datatorrent/stram/engine/GenericNode.java | 3 ++- .../java/com/datatorrent/stram/plan/logical/LogicalPlan.java | 8 +++++++- .../com/datatorrent/stram/plan/physical/PhysicalPlan.java | 4 ++-- .../com/datatorrent/stram/plan/physical/StreamMapping.java | 2 +- 5 files changed, 12 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/api/src/main/java/com/datatorrent/api/Context.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java index 58bc552..ceed8a2 100644 --- a/api/src/main/java/com/datatorrent/api/Context.java +++ b/api/src/main/java/com/datatorrent/api/Context.java @@ -166,13 +166,6 @@ public interface Context */ Attribute> TUPLE_CLASS = new Attribute<>(new Class2String<>()); - /** - * Attribute of input port. - * This is a read-only attribute to query whether the input port is connected to a DelayOperator - * This is for iterative processing. - */ - Attribute IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false); - @SuppressWarnings("FieldNameHidesFieldInSuperclass") long serialVersionUID = AttributeMap.AttributeInitializer.initialize(PortContext.class); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java index 4777f93..1ccec31 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java @@ -40,6 +40,7 @@ import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.netlet.util.CircularBuffer; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats; import com.datatorrent.stram.debug.TappedReservoir; +import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.logical.Operators; import com.datatorrent.stram.tuple.ResetWindowTuple; import com.datatorrent.stram.tuple.Tuple; @@ -207,7 +208,7 @@ public class GenericNode extends Node if (pcPair == null || pcPair.context == null) { return false; } - return pcPair.context.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR); + return pcPair.context.getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR); } /** http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 3c26118..883ad71 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -76,6 +76,12 @@ import com.datatorrent.stram.engine.Slider; */ public class LogicalPlan implements Serializable, DAG { + /** + * Attribute of input port. + * This is a read-only attribute to query whether the input port is connected to a DelayOperator + * This is for iterative processing. + */ + public static final Attribute IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false); @SuppressWarnings("FieldNameHidesFieldInSuperclass") private static final long serialVersionUID = -2099729915606048704L; private static final Logger LOG = LoggerFactory.getLogger(LogicalPlan.class); @@ -1914,7 +1920,7 @@ public class LogicalPlan implements Serializable, DAG for (InputPortMeta sink: downStream.sinks) { if (om.getOperator() instanceof Operator.DelayOperator) { // this is an iteration loop, do not treat it as downstream when detecting cycles - sink.attributes.put(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR, true); + sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, true); continue; } OperatorMeta successor = sink.getOperatorWrapper(); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java index da96ef3..c696224 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java @@ -948,11 +948,11 @@ public class PhysicalPlan implements Serializable PTOperator slidingUnifier = StreamMapping.createSlidingUnifier(sourceOut.logicalStream, this, sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT), slidingWindowCount); StreamMapping.addInput(slidingUnifier, sourceOut, null); - input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0), ipm.getKey().getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR)); + input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0), ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR)); sourceMapping.outputStreams.get(ipm.getValue().getSource()).slidingUnifiers.add(slidingUnifier); } else { - input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut, ipm.getKey().getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR)); + input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut, ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR)); } oper.inputs.add(input); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java index 91c6eef..f30ceb6 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java @@ -347,7 +347,7 @@ public class StreamMapping implements java.io.Serializable // link to upstream output(s) for this stream for (PTOutput upstreamOut : sourceOper.outputs) { if (upstreamOut.logicalStream == streamMeta) { - PTInput input = new PTInput(ipm.getPortName(), streamMeta, oper, pks, upstreamOut, ipm.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR)); + PTInput input = new PTInput(ipm.getPortName(), streamMeta, oper, pks, upstreamOut, ipm.getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR)); oper.inputs.add(input); } }