Return-Path: X-Original-To: apmail-apex-commits-archive@minotaur.apache.org Delivered-To: apmail-apex-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 53C251727A for ; Wed, 9 Sep 2015 22:12:46 +0000 (UTC) Received: (qmail 17274 invoked by uid 500); 9 Sep 2015 22:12:46 -0000 Delivered-To: apmail-apex-commits-archive@apex.apache.org Received: (qmail 17236 invoked by uid 500); 9 Sep 2015 22:12:46 -0000 Mailing-List: contact commits-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list commits@apex.incubator.apache.org Received: (qmail 17226 invoked by uid 99); 9 Sep 2015 22:12:46 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Sep 2015 22:12:46 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id C0BD5C009C for ; Wed, 9 Sep 2015 22:12:45 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.775 X-Spam-Level: * X-Spam-Status: No, score=1.775 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.006, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id bZGEUa7g6zIj for ; Wed, 9 Sep 2015 22:12:40 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id AEA2C4456F for ; Wed, 9 Sep 2015 22:12:33 +0000 (UTC) Received: (qmail 15146 invoked by uid 99); 9 Sep 2015 22:12:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Sep 2015 22:12:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 25D39E0991; Wed, 9 Sep 2015 22:12:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chetan@apache.org To: commits@apex.incubator.apache.org Date: Wed, 09 Sep 2015 22:12:42 -0000 Message-Id: <8b2634f095e345dd81c4628be1ed0547@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [11/50] incubator-apex-core git commit: Deprecated name property from BaseOperator Deprecated name property from BaseOperator Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/fe5d0356 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/fe5d0356 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/fe5d0356 Branch: refs/heads/master Commit: fe5d03560804a151e4c415939f3ca7edbd686b15 Parents: 19d6658 Author: Timothy Farkas Authored: Mon Aug 3 14:16:52 2015 -0700 Committer: Timothy Farkas Committed: Thu Aug 6 09:37:59 2015 -0700 ---------------------------------------------------------------------- .../datatorrent/common/util/BaseOperator.java | 2 + engine/pom.xml | 10 ++- .../stram/plan/logical/LogicalPlan.java | 6 +- .../com/datatorrent/stram/StreamCodecTest.java | 45 +++++------ .../stram/StreamingContainerManagerTest.java | 12 +-- .../datatorrent/stram/plan/LogicalPlanTest.java | 16 ++-- .../stram/plan/physical/PhysicalPlanTest.java | 9 +-- .../stram/webapp/OperatorDiscoveryTest.java | 78 ++++++++++---------- 8 files changed, 91 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/common/src/main/java/com/datatorrent/common/util/BaseOperator.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/util/BaseOperator.java b/common/src/main/java/com/datatorrent/common/util/BaseOperator.java index 0c2f8b3..f653d14 100644 --- a/common/src/main/java/com/datatorrent/common/util/BaseOperator.java +++ b/common/src/main/java/com/datatorrent/common/util/BaseOperator.java @@ -33,6 +33,7 @@ public class BaseOperator implements Operator /** * @return the name property of the operator. */ + @Deprecated public String getName() { return name; @@ -43,6 +44,7 @@ public class BaseOperator implements Operator * * @param name */ + @Deprecated public void setName(String name) { this.name = name; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/pom.xml ---------------------------------------------------------------------- diff --git a/engine/pom.xml b/engine/pom.xml index 1e6a7ed..c91265c 100644 --- a/engine/pom.xml +++ b/engine/pom.xml @@ -229,9 +229,15 @@ org.mockito - mockito-all - 1.8.5 + mockito-core + 1.10.19 test + + + org.hamcrest + hamcrest-core + + net.lingala.zip4j http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index fc182cd..9bfc2bd 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -854,10 +854,6 @@ public class LogicalPlan implements Serializable, DAG @Override public T addOperator(String name, T operator) { - // TODO: optional interface to provide contextual information to instance - if (operator instanceof BaseOperator) { - ((BaseOperator)operator).setName(name); - } if (operators.containsKey(name)) { if (operators.get(name) == (Object)operator) { return operator; @@ -1219,7 +1215,7 @@ public class LogicalPlan implements Serializable, DAG } } - // Validate root operators are input operators + // Validate root operators are input operators for (OperatorMeta om : this.rootOperators) { if (!(om.getOperator() instanceof InputOperator)) { throw new ValidationException(String.format("Root operator: %s is not a Input operator", http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java index 046425f..9726e65 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java @@ -73,14 +73,14 @@ public class StreamCodecTest LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2); LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3); - OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, node1.getName(), dnm); + OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm); OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1)); String id = n1meta.getName() + " " + n1odi.portName; Assert.assertEquals("number stream codecs " + id, n1odi.streamCodecs.size(), 1); Assert.assertTrue("No user set stream codec", n1odi.streamCodecs.containsValue(null)); - OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, node2.getName(), dnm); + OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, dnm); OperatorDeployInfo.InputDeployInfo n2idi = getInputDeployInfo(n2di, n2meta.getMeta(node2.inport1)); id = n2meta.getName() + " " + n2idi.portName; @@ -93,7 +93,7 @@ public class StreamCodecTest checkPresentStreamCodec(n3meta, node3.inport1, n2odi.streamCodecs, id, plan); - OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, node3.getName(), dnm); + OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, dnm); OperatorDeployInfo.InputDeployInfo n3idi = getInputDeployInfo(n3di, n3meta.getMeta(node3.inport1)); id = n3meta.getName() + " " + n3idi.portName; @@ -139,12 +139,12 @@ public class StreamCodecTest StreamingContainerManagerTest.assignContainer(dnm, "container" + (i + 1)); } - getSingleOperatorDeployInfo(node1, node1.getName(), dnm); - getSingleOperatorDeployInfo(node2, node2.getName(), dnm); - getSingleOperatorDeployInfo(node3, node3.getName(), dnm); - getSingleOperatorDeployInfo(node4, node4.getName(), dnm); - getSingleOperatorDeployInfo(node5, node5.getName(), dnm); - getSingleOperatorDeployInfo(node6, node6.getName(), dnm); + getSingleOperatorDeployInfo(node1, dnm); + getSingleOperatorDeployInfo(node2, dnm); + getSingleOperatorDeployInfo(node3, dnm); + getSingleOperatorDeployInfo(node4, dnm); + getSingleOperatorDeployInfo(node5, dnm); + getSingleOperatorDeployInfo(node6, dnm); Assert.assertEquals("number of stream codec identifiers", 3, plan.getStreamCodecIdentifiers().size()); } @@ -180,14 +180,14 @@ public class StreamCodecTest LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2); LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3); - OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, node1.getName(), dnm); + OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm); OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1)); String id = n1meta.getName() + " " + n1odi.portName; Assert.assertEquals("number stream codecs " + id, n1odi.streamCodecs.size(), 1); checkPresentStreamCodec(n2meta, node2.inportWithCodec, n1odi.streamCodecs, id, plan); - OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, node2.getName(), dnm); + OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, dnm); OperatorDeployInfo.InputDeployInfo n2idi = getInputDeployInfo(n2di, n2meta.getMeta(node2.inportWithCodec)); id = n2meta.getName() + " " + n2idi.portName; @@ -199,7 +199,7 @@ public class StreamCodecTest Assert.assertEquals("number stream codecs " + id, n2odi.streamCodecs.size(), 1); checkPresentStreamCodec(n3meta, node3.inportWithCodec, n2odi.streamCodecs, id, plan); - OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, node3.getName(), dnm); + OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, dnm); OperatorDeployInfo.InputDeployInfo n3idi = getInputDeployInfo(n3di, n3meta.getMeta(node3.inportWithCodec)); id = n3meta.getName() + " " + n3idi.portName; @@ -238,7 +238,7 @@ public class StreamCodecTest LogicalPlan.OperatorMeta n1meta = dag.getMeta(node1); LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2); - OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, node1.getName(), dnm); + OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm); OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1)); String id = n1meta.getName() + " " + n1odi.portName; @@ -449,21 +449,21 @@ public class StreamCodecTest LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2); LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3); - OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, node1.getName(), dnm); + OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm); OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1)); String id = n1meta.getName() + " " + n1odi.portName; Assert.assertEquals("number stream codecs " + id, n1odi.streamCodecs.size(), 1); checkPresentStreamCodec(n2meta, node2.inport1, n1odi.streamCodecs, id, plan); - OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, node2.getName(), dnm); + OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, dnm); OperatorDeployInfo.InputDeployInfo n2idi = getInputDeployInfo(n2di, n2meta.getMeta(node2.inport1)); id = n2meta.getName() + " " + n2idi.portName; Assert.assertEquals("number stream codecs " + id, n2idi.streamCodecs.size(), 1); checkPresentStreamCodec(n2meta, node2.inport1, n2idi.streamCodecs, id, plan); - OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, node3.getName(), dnm); + OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, dnm); OperatorDeployInfo.InputDeployInfo n3idi = getInputDeployInfo(n3di, n3meta.getMeta(node3.inport1)); id = n3meta.getName() + " " + n3idi.portName; @@ -584,7 +584,7 @@ public class StreamCodecTest LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2); LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3); - OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, node1.getName(), dnm); + OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm); OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1)); String id = n1meta.getName() + " " + n1odi.portName; @@ -592,14 +592,14 @@ public class StreamCodecTest checkPresentStreamCodec(n2meta, node2.inport1, n1odi.streamCodecs, id, plan); checkPresentStreamCodec(n3meta, node3.inport1, n1odi.streamCodecs, id, plan); - OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, node2.getName(), dnm); + OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, dnm); OperatorDeployInfo.InputDeployInfo n2idi = getInputDeployInfo(n2di, n2meta.getMeta(node2.inport1)); id = n2meta.getName() + " " + n2idi.portName; Assert.assertEquals("number stream codecs " + id, n2idi.streamCodecs.size(), 1); checkPresentStreamCodec(n2meta, node2.inport1, n2idi.streamCodecs, id, plan); - OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, node3.getName(), dnm); + OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, dnm); OperatorDeployInfo.InputDeployInfo n3idi = getInputDeployInfo(n3di, n3meta.getMeta(node3.inport1)); id = n3meta.getName() + " " + n3idi.portName; @@ -855,14 +855,14 @@ public class StreamCodecTest Assert.assertNotNull("non inline operator is null", nonInlineOperator); - OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, node1.getName(), dnm); + OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm); OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1)); String id = n1meta.getName() + " " + n1odi.portName; Assert.assertEquals("number stream codecs " + id, n1odi.streamCodecs.size(), 1); checkPresentStreamCodec(nonInlineMeta, niInputPort, n1odi.streamCodecs, id, plan); - OperatorDeployInfo odi = getSingleOperatorDeployInfo(nonInlineOperator, nonInlineOperator.getName(), dnm); + OperatorDeployInfo odi = getSingleOperatorDeployInfo(nonInlineOperator, dnm); OperatorDeployInfo.InputDeployInfo idi = getInputDeployInfo(odi, nonInlineMeta.getMeta(niInputPort)); id = nonInlineMeta.getName() + " " + idi.portName; @@ -1218,9 +1218,10 @@ public class StreamCodecTest Assert.assertEquals("stream codec not same " + id, opStreamCodecInfo, streamCodecInfo); } - private OperatorDeployInfo getSingleOperatorDeployInfo(Operator oper, String id, StreamingContainerManager scm) + private OperatorDeployInfo getSingleOperatorDeployInfo(Operator oper, StreamingContainerManager scm) { LogicalPlan dag = scm.getLogicalPlan(); + String id = dag.getMeta(oper).toString(); PhysicalPlan plan = scm.getPhysicalPlan(); List operators = plan.getOperators(dag.getMeta(oper)); Assert.assertEquals("number of operators " + id, 1, operators.size()); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index ba15a78..38a54f0 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -188,8 +188,8 @@ public class StreamingContainerManagerTest { Assert.assertEquals("number operators assigned to container", 3, c2.size()); OperatorDeployInfo o2DI = getNodeDeployInfo(c2, dag.getMeta(o2)); OperatorDeployInfo o3DI = getNodeDeployInfo(c2, dag.getMeta(o3)); - Assert.assertNotNull(o2.getName() + " assigned to " + sca2.container.getExternalId(), o2DI); - Assert.assertNotNull(o3.getName() + " assigned to " + sca2.container.getExternalId(), o3DI); + Assert.assertNotNull(dag.getMeta(o2) + " assigned to " + sca2.container.getExternalId(), o2DI); + Assert.assertNotNull(dag.getMeta(o3) + " assigned to " + sca2.container.getExternalId(), o3DI); Assert.assertTrue("The buffer server memory for container 1", 256 == sca1.getInitContext().getValue(ContainerContext.BUFFER_SERVER_MB)); Assert.assertTrue("The buffer server memory for container 2", 0 == sca2.getInitContext().getValue(ContainerContext.BUFFER_SERVER_MB)); @@ -218,7 +218,7 @@ public class StreamingContainerManagerTest { // THREAD_LOCAL o4.inport1 OperatorDeployInfo o4DI = getNodeDeployInfo(c2, dag.getMeta(o4)); - Assert.assertNotNull(o4.getName() + " assigned to " + sca2.container.getExternalId(), o4DI); + Assert.assertNotNull(dag.getMeta(o4) + " assigned to " + sca2.container.getExternalId(), o4DI); InputDeployInfo c2o4i1 = getInputDeployInfo(o4DI, "o3.outport1"); Assert.assertNotNull("input from o3.outport1", c2o4i1); Assert.assertEquals("portName " + c2o4i1, GenericTestOperator.IPORT1, c2o4i1.portName); @@ -271,7 +271,7 @@ public class StreamingContainerManagerTest { StreamingContainerAgent sca1 = dnm.getContainerAgent(c.getExternalId()); List c1 = getDeployInfo(sca1); Assert.assertEquals("number operators assigned to container", 1, c1.size()); - Assert.assertTrue(node2.getName() + " assigned to " + sca1.container.getExternalId(), containsNodeContext(c1, dag.getMeta(node1))); + Assert.assertTrue(dag.getMeta(node2) + " assigned to " + sca1.container.getExternalId(), containsNodeContext(c1, dag.getMeta(node1))); List o2Partitions = plan.getOperators(dag.getMeta(node2)); Assert.assertEquals("number partitions", TestStaticPartitioningSerDe.partitions.length, o2Partitions.size()); @@ -280,7 +280,7 @@ public class StreamingContainerManagerTest { String containerId = o2Partitions.get(i).getContainer().getExternalId(); List cc = getDeployInfo(dnm.getContainerAgent(containerId)); Assert.assertEquals("number operators assigned to container", 1, cc.size()); - Assert.assertTrue(node2.getName() + " assigned to " + containerId, containsNodeContext(cc, dag.getMeta(node2))); + Assert.assertTrue(dag.getMeta(node2) + " assigned to " + containerId, containsNodeContext(cc, dag.getMeta(node2))); // n1n2 in, mergeStream out OperatorDeployInfo ndi = cc.get(0); @@ -338,7 +338,7 @@ public class StreamingContainerManagerTest { Assert.assertEquals("number operators " + cmerge, 1, cmerge.size()); OperatorDeployInfo node3DI = getNodeDeployInfo(cmerge, dag.getMeta(node3)); - Assert.assertNotNull(node3.getName() + " assigned", node3DI); + Assert.assertNotNull(dag.getMeta(node3) + " assigned", node3DI); Assert.assertEquals("inputs " + node3DI, 1, node3DI.inputs.size()); InputDeployInfo node3In = node3DI.inputs.get(0); Assert.assertEquals("streamName " + node3In, n2n3.getName(), node3In.declaredStreamId); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java index 24a9031..5bda8ee 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java @@ -89,16 +89,16 @@ public class LogicalPlanTest { dag.findStronglyConnected(dag.getMeta(operator7), cycles); assertEquals("operator self reference", 1, cycles.size()); assertEquals("operator self reference", 1, cycles.get(0).size()); - assertEquals("operator self reference", operator7.getName(), cycles.get(0).get(0)); + assertEquals("operator self reference", dag.getMeta(operator7).getName(), cycles.get(0).get(0)); // 3 operator cycle cycles.clear(); dag.findStronglyConnected(dag.getMeta(operator4), cycles); assertEquals("3 operator cycle", 1, cycles.size()); assertEquals("3 operator cycle", 3, cycles.get(0).size()); - assertTrue("operator2", cycles.get(0).contains(operator2.getName())); - assertTrue("operator3", cycles.get(0).contains(operator3.getName())); - assertTrue("operator4", cycles.get(0).contains(operator4.getName())); + assertTrue("operator2", cycles.get(0).contains(dag.getMeta(operator2).getName())); + assertTrue("operator3", cycles.get(0).contains(dag.getMeta(operator3).getName())); + assertTrue("operator4", cycles.get(0).contains(dag.getMeta(operator4).getName())); try { dag.validate(); @@ -294,7 +294,7 @@ public class LogicalPlanTest { Assert.fail("should throw ConstraintViolationException"); } catch (ConstraintViolationException e) { Assert.assertEquals("violation details", constraintViolations, e.getConstraintViolations()); - String expRegex = ".*ValidationTestOperator\\{name=testOperator}, propertyPath='intField1', message='must be greater than or equal to 2',.*value=1}]"; + String expRegex = ".*ValidationTestOperator\\{name=null}, propertyPath='intField1', message='must be greater than or equal to 2',.*value=1}]"; Assert.assertThat("exception message", e.getMessage(), RegexMatcher.matches(expRegex)); } @@ -396,7 +396,7 @@ public class LogicalPlanTest { dag.validate(); Assert.fail("should raise operator is not partitionable for operator1"); } catch (ValidationException e) { - Assert.assertEquals("", "Operator " + operator.getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e.getMessage()); + Assert.assertEquals("", "Operator " + dag.getMeta(operator).getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e.getMessage()); } dag.setAttribute(operator, OperatorContext.PARTITIONER, null); @@ -406,7 +406,7 @@ public class LogicalPlanTest { dag.validate(); Assert.fail("should raise operator is not partitionable for operator1"); } catch (ValidationException e) { - Assert.assertEquals("", "Operator " + operator.getName() + " is not partitionable but PARTITION_PARALLEL attribute is set", e.getMessage()); + Assert.assertEquals("", "Operator " + dag.getMeta(operator).getName() + " is not partitionable but PARTITION_PARALLEL attribute is set", e.getMessage()); } dag.setInputPortAttribute(operator.input1, PortContext.PARTITION_PARALLEL, false); @@ -419,7 +419,7 @@ public class LogicalPlanTest { dag.validate(); Assert.fail("should raise operator is not partitionable for operator2"); } catch (ValidationException e) { - Assert.assertEquals("Operator " + operator2.getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e.getMessage()); + Assert.assertEquals("Operator " + dag.getMeta(operator2).getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e.getMessage()); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java index 9382a4b..ccf930f 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java @@ -180,7 +180,6 @@ public class PhysicalPlanTest p.getPartitionKeys().put(this.inport1, lpks); p.getPartitionKeys().put(this.inportWithCodec, lpks); p.getPartitionedInstance().pks = p.getPartitionKeys().values().toString(); - p.getPartitionedInstance().setName(p.getPartitionKeys().values().toString()); newPartitions.add(p); } @@ -252,7 +251,7 @@ public class PhysicalPlanTest dag.addStream("node1.outport1", node1.outport1, node2.inport2, node2.inport1); int initialPartitionCount = 5; - OperatorMeta node2Decl = dag.getOperatorMeta(node2.getName()); + OperatorMeta node2Decl = dag.getMeta(node2); node2Decl.getAttributes().put(OperatorContext.PARTITIONER, new StatelessPartitioner(initialPartitionCount)); PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext()); @@ -350,7 +349,7 @@ public class PhysicalPlanTest dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2); - OperatorMeta o2Meta = dag.getOperatorMeta(o2.getName()); + OperatorMeta o2Meta = dag.getMeta(o2); o2Meta.getAttributes().put(OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener) new PartitionLoadWatch(0, 5))); o2Meta.getAttributes().put(OperatorContext.PARTITIONER, new StatelessPartitioner(1)); @@ -441,7 +440,7 @@ public class PhysicalPlanTest public void testInputOperatorPartitioning() { LogicalPlan dag = new LogicalPlan(); TestInputOperator o1 = dag.addOperator("o1", new TestInputOperator()); - OperatorMeta o1Meta = dag.getOperatorMeta(o1.getName()); + OperatorMeta o1Meta = dag.getMeta(o1); dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); dag.setAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner>(2)); @@ -509,7 +508,7 @@ public class PhysicalPlanTest dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2); - OperatorMeta node2Meta = dag.getOperatorMeta(o2.getName()); + OperatorMeta node2Meta = dag.getMeta(o2); node2Meta.getAttributes().put(OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener) new PartitionLoadWatch(3, 5))); node2Meta.getAttributes().put(OperatorContext.PARTITIONER, new StatelessPartitioner(8)); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java index ad915c8..8baa08a 100644 --- a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java +++ b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java @@ -57,7 +57,7 @@ import com.google.common.collect.Lists; public class OperatorDiscoveryTest { // private static final Logger LOG = LoggerFactory.getLogger(OperatorDiscoveryTest.class); - + public static class GenericClassBase extends BaseOperator { private int A; @@ -271,7 +271,7 @@ public class OperatorDiscoveryTest @Test public void testPropertyDiscovery() throws Exception { - + String[] classFilePath = getClassFileInClasspath(); OperatorDiscoverer od = new OperatorDiscoverer(classFilePath); od.buildTypeGraph(); @@ -343,8 +343,8 @@ public class OperatorDiscoveryTest props = desc.getJSONArray("properties"); genericArray = getJSONProperty(props, "genericArray"); Assert.assertEquals(debug + "type " + genericArray, String[].class.getName(), genericArray.get("type")); - - + + // Test complicated Type Variable override in Hierarchy desc = od.describeClassByASM(SubSubClass.class.getName()); props = desc.getJSONArray("properties"); @@ -554,9 +554,9 @@ public class OperatorDiscoveryTest return false; return true; } - - - + + + } @@ -573,7 +573,7 @@ public class OperatorDiscoveryTest private long longProp; private double doubleProp; private boolean booleanProp; - + private Integer integerProp; private List stringList; private List nestedList; @@ -612,39 +612,39 @@ public class OperatorDiscoveryTest { return mProp; } - + public String getAlias() { return realName; } - + public void setAlias(String alias) { realName = alias; } - + public String getGetterOnly() { return getterOnly; } - - + + public URI getUri() { return uri; } - + public void setUri(URI uri) { this.uri = uri; } - - + + public void setIntegerProp(Integer integerProp) { this.integerProp = integerProp; } - + public Integer getIntegerProp() { return integerProp; @@ -734,7 +734,7 @@ public class OperatorDiscoveryTest { return stringArray; } - + public void setStringArray(String[] stringArray) { this.stringArray = stringArray; @@ -858,15 +858,15 @@ public class OperatorDiscoveryTest static class ExtendedOperator extends TestOperator> { } - + public static class BaseClass { private A a; - + private B b; private C c; - + public void setA(A a) { this.a = a; @@ -875,12 +875,12 @@ public class OperatorDiscoveryTest { this.b = b; } - + public A getA() { return a; } - + public B getB() { return b; @@ -890,7 +890,7 @@ public class OperatorDiscoveryTest { this.c = c; } - + public C getC() { return c; @@ -900,28 +900,28 @@ public class OperatorDiscoveryTest public static class SubClass extends BaseClass { private D d; - + public void setD(D d) { this.d = d; } - + public D getD() { return d; } - + } public static class SubSubClass extends SubClass, Long> { private E e; - + public void setE(E e) { this.e = e; } - + public E getE() { return e; @@ -975,7 +975,7 @@ public class OperatorDiscoveryTest Assert.assertArrayEquals(ah.intArray, clone.intArray); } - + @Test public void testLogicalPlanConfiguration() throws Exception { @@ -995,13 +995,13 @@ public class OperatorDiscoveryTest ObjectMapper mapper = ObjectMapperFactory.getOperatorValueSerializer(); String s = mapper.writeValueAsString(bean); // LOG.debug(new JSONObject(s).toString(2)); - // + // Assert.assertTrue("Shouldn't contain field 'realName' !", !s.contains("realName")); Assert.assertTrue("Should contain property 'alias' !", s.contains("alias")); Assert.assertTrue("Shouldn't contain property 'getterOnly' !", !s.contains("getterOnly")); JSONObject jsonObj = new JSONObject(s); - - // create the json dag representation + + // create the json dag representation JSONObject jsonPlan = new JSONObject(); jsonPlan.put("streams", new JSONArray()); JSONObject jsonOper = new JSONObject(); @@ -1009,17 +1009,17 @@ public class OperatorDiscoveryTest jsonOper.put("class", TestOperator.class.getName()); jsonOper.put("properties", jsonObj); jsonPlan.put("operators", new JSONArray(Lists.newArrayList(jsonOper))); - - + + Configuration conf = new Configuration(false); LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf); - // create logical plan from the json + // create logical plan from the json LogicalPlan lp = lpc.createFromJson(jsonPlan, "jsontest"); OperatorMeta om = lp.getOperatorMeta("Test Operator"); Assert.assertTrue(om.getOperator() instanceof TestOperator); @SuppressWarnings("rawtypes") TestOperator beanBack = (TestOperator) om.getOperator(); - + // The operator deserialized back from json should be same as original operator Assert.assertEquals(bean.map, beanBack.map); Assert.assertArrayEquals(bean.stringArray, beanBack.stringArray); @@ -1031,8 +1031,8 @@ public class OperatorDiscoveryTest Assert.assertEquals(bean.booleanProp, beanBack.booleanProp); Assert.assertEquals(bean.realName, beanBack.realName); Assert.assertEquals(bean.getterOnly, beanBack.getterOnly); - - + + } public static class SchemaRequiredOperator extends BaseOperator implements InputOperator