Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5C8A6200C51 for ; Sun, 9 Apr 2017 22:53:21 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5B08E160B88; Sun, 9 Apr 2017 20:53:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D602C160BA4 for ; Sun, 9 Apr 2017 22:53:18 +0200 (CEST) Received: (qmail 19192 invoked by uid 500); 9 Apr 2017 20:53:18 -0000 Mailing-List: contact commits-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list commits@apex.apache.org Received: (qmail 19101 invoked by uid 99); 9 Apr 2017 20:53:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 09 Apr 2017 20:53:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B915CDFC31; Sun, 9 Apr 2017 20:53:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: vrozov@apache.org To: commits@apex.apache.org Date: Sun, 09 Apr 2017 20:53:18 -0000 Message-Id: <965285bb0ff0470eab8d03621ee73fab@git.apache.org> In-Reply-To: <4426997cd70d4851bc61e54bbdbe76c4@git.apache.org> References: <4426997cd70d4851bc61e54bbdbe76c4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] apex-core git commit: APEXCORE-658 Enable apex. prefix for configuration keys. archived-at: Sun, 09 Apr 2017 20:53:21 -0000 APEXCORE-658 Enable apex. prefix for configuration keys. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/412a3bd8 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/412a3bd8 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/412a3bd8 Branch: refs/heads/master Commit: 412a3bd81fbfc973858f36172533c0d5ab83e39f Parents: aa81bea Author: Thomas Weise Authored: Mon Mar 27 20:54:51 2017 -0700 Committer: Thomas Weise Committed: Sun Apr 9 13:42:38 2017 -0700 ---------------------------------------------------------------------- .../src/main/resources/META-INF/properties.xml | 8 +- .../src/site/conf/my-app-conf1.xml | 4 +- .../src/main/resources/META-INF/properties.xml | 10 +- .../main/java/com/datatorrent/api/Context.java | 2 +- .../datatorrent/api/StreamingApplication.java | 10 + docs/application_development.md | 14 +- docs/application_packages.md | 56 ++--- .../java/com/datatorrent/stram/cli/ApexCli.java | 16 +- .../plan/logical/LogicalPlanConfiguration.java | 40 ++-- .../datatorrent/stram/StramMiniClusterTest.java | 40 ++-- .../logical/LogicalPlanConfigurationTest.java | 238 +++++++++++-------- .../logical/module/TestModuleExpansion.java | 2 +- .../logical/module/TestModuleProperties.java | 10 +- .../test/resources/clusterTest.app.properties | 44 ---- engine/src/test/resources/dt-site.xml | 28 +-- .../resources/testModuleTopology.properties | 62 ++--- .../src/test/resources/testTopology.properties | 36 +-- .../testTopologyLegacyPrefix.properties | 27 +++ 18 files changed, 341 insertions(+), 306 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/apex-app-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/apex-app-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml b/apex-app-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml index 876c39a..34679b6 100644 --- a/apex-app-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml +++ b/apex-app-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml @@ -2,22 +2,22 @@ - dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples + apex.application.MyFirstApplication.operator.randomGenerator.prop.numTuples 1000 - dt.application.MyFirstApplication.operator.console.prop.stringFormat + apex.application.MyFirstApplication.operator.console.prop.stringFormat hello world: %s http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/apex-app-archetype/src/main/resources/archetype-resources/src/site/conf/my-app-conf1.xml ---------------------------------------------------------------------- diff --git a/apex-app-archetype/src/main/resources/archetype-resources/src/site/conf/my-app-conf1.xml b/apex-app-archetype/src/main/resources/archetype-resources/src/site/conf/my-app-conf1.xml index ccb2b66..7ceba7c 100644 --- a/apex-app-archetype/src/main/resources/archetype-resources/src/site/conf/my-app-conf1.xml +++ b/apex-app-archetype/src/main/resources/archetype-resources/src/site/conf/my-app-conf1.xml @@ -1,11 +1,11 @@ - dt.attr.MASTER_MEMORY_MB + apex.attr.MASTER_MEMORY_MB 1024 - dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples + apex.application.MyFirstApplication.operator.randomGenerator.prop.numTuples 1000 http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/apex-conf-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/apex-conf-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml b/apex-conf-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml index 9044325..0ee7dc2 100644 --- a/apex-conf-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml +++ b/apex-conf-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml @@ -2,24 +2,24 @@ - dt.attr.MASTER_MEMORY_MB + apex.attr.MASTER_MEMORY_MB 1024 - dt.application.MyFirstApplication.operator.seedGen.prop.seedStart + apex.application.MyFirstApplication.operator.seedGen.prop.seedStart 1 - dt.application.MyFirstApplication.operator.seedGen.prop.seedEnd + apex.application.MyFirstApplication.operator.seedGen.prop.seedEnd 10 - dt.application.MyFirstApplication.operator.console.prop.stringFormat + apex.application.MyFirstApplication.operator.console.prop.stringFormat hello world: %s http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/api/src/main/java/com/datatorrent/api/Context.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java index 94022ff..3f7d96c 100644 --- a/api/src/main/java/com/datatorrent/api/Context.java +++ b/api/src/main/java/com/datatorrent/api/Context.java @@ -426,7 +426,7 @@ public interface Context */ Attribute TUPLE_RECORDING_PART_FILE_TIME_MILLIS = new Attribute<>(30 * 60 * 60 * 1000); /** - * Address to which the application side connects to DT Gateway, in the form of host:port. This will override "dt.gateway.listenAddress" in the configuration. + * Address of optional web-socket pub-sub gateway to emit application events, in the form of host:port. */ Attribute GATEWAY_CONNECT_ADDRESS = new Attribute<>(String2String.getInstance()); /** http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/api/src/main/java/com/datatorrent/api/StreamingApplication.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/StreamingApplication.java b/api/src/main/java/com/datatorrent/api/StreamingApplication.java index cf1d6ec..854168a 100644 --- a/api/src/main/java/com/datatorrent/api/StreamingApplication.java +++ b/api/src/main/java/com/datatorrent/api/StreamingApplication.java @@ -39,6 +39,16 @@ import org.apache.hadoop.conf.Configuration; */ public interface StreamingApplication { + /** + * Prefix used in configuration keys. + */ + String APEX_PREFIX = "apex."; + + /** + * Legacy prefix, to be removed in future release, + * when all code dependencies are upgraded. + */ + @Deprecated String DT_PREFIX = "dt."; /** * Launch mode for the application. http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/docs/application_development.md ---------------------------------------------------------------------- diff --git a/docs/application_development.md b/docs/application_development.md index 6fe1d4f..07cfd81 100644 --- a/docs/application_development.md +++ b/docs/application_development.md @@ -1255,15 +1255,15 @@ For example `myApplication.properties` ``` # input operator that reads from a file -dt.operator.inputOp.classname=com.acme.SampleInputOperator -dt.operator.inputOp.fileName=somefile.txt +apex.operator.inputOp.classname=com.acme.SampleInputOperator +apex.operator.inputOp.fileName=somefile.txt # output operator that writes to the console -dt.operator.outputOp.classname=com.acme.ConsoleOutputOperator +apex.operator.outputOp.classname=com.acme.ConsoleOutputOperator # stream connecting both operators -dt.stream.inputStream.source=inputOp.outputPort -dt.stream.inputStream.sinks=outputOp.inputPort +apex.stream.inputStream.source=inputOp.outputPort +apex.stream.inputStream.sinks=outputOp.inputPort ``` @@ -1776,7 +1776,7 @@ The last configurable parameter for affinity rules is strict or preferred rule. The same set of rules can also be added from properties.xml by setting value for attribute DAGContext.AFFINITY_RULES_SET as JSON string. For example: ```xml - dt.application.AffinityRulesSampleApplication.attr.AFFINITY_RULES_SET + apex.application.AffinityRulesSampleApplication.attr.AFFINITY_RULES_SET { "affinityRules": [ @@ -2952,4 +2952,4 @@ The source code for the demos is available in the open-source [Apache Apex-Malhar repository](https://github.com/apache/apex-malhar). All of these do computations in real-time. Developers are encouraged to review them as they use various features of the platform and provide an -opportunity for quick learning. \ No newline at end of file +opportunity for quick learning. http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/docs/application_packages.md ---------------------------------------------------------------------- diff --git a/docs/application_packages.md b/docs/application_packages.md index 891ecee..74886fc 100644 --- a/docs/application_packages.md +++ b/docs/application_packages.md @@ -23,26 +23,26 @@ line, or using your favorite IDE. First, change to the directory where you put your projects, and create an Apex application project using Maven by running the following -command. Replace "com.example", "mydtapp" and "1.0-SNAPSHOT" with the +command. Replace "com.example", "myapp" and "1.0-SNAPSHOT" with the appropriate values (make sure this is all on one line): $ mvn archetype:generate \ -DarchetypeGroupId=org.apache.apex \ -DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=3.4.0 \ - -DgroupId=com.example -Dpackage=com.example.mydtapp -DartifactId=mydtapp \ + -DgroupId=com.example -Dpackage=com.example.myapp -DartifactId=myapp \ -Dversion=1.0-SNAPSHOT -This creates a Maven project named "mydtapp". Open it with your favorite +This creates a Maven project named "myapp". Open it with your favorite IDE (e.g. NetBeans, Eclipse, IntelliJ IDEA). In the project, there is a sample DAG that generates a number of tuples with a random number and prints out "hello world" and the random number in the tuples. The code that builds the DAG is in -src/main/java/com/example/mydtapp/Application.java, and the code that +src/main/java/com/example/myapp/Application.java, and the code that runs the unit test for the DAG is in -src/test/java/com/example/mydtapp/ApplicationTest.java. Try it out by +src/test/java/com/example/myapp/ApplicationTest.java. Try it out by running the following command: - $cd mydtapp; mvn package + $cd myapp; mvn package This builds the App Package runs the unit test of the DAG. You should be getting test output similar to this: @@ -52,7 +52,7 @@ be getting test output similar to this: TESTS ------------------------------------------------------- - Running com.example.mydtapp.ApplicationTest + Running com.example.myapp.ApplicationTest hello world: 0.8015370953286478 hello world: 0.9785359225545481 hello world: 0.6322611586644047 @@ -76,7 +76,7 @@ be getting test output similar to this: ``` The "mvn package" command creates the App Package file in target -directory as target/mydtapp-1.0-SNAPSHOT.apa. You will be able to use +directory as target/myapp-1.0-SNAPSHOT.apa. You will be able to use that App Package file to launch this sample application in your actual Apex installation. @@ -132,7 +132,7 @@ the default pom.xml: ``` By default, as shown above, the default dependencies include -malhar-library in compile scope, dt-engine in provided scope, and junit +malhar-library in compile scope, apex-engine in provided scope, and junit in test scope. Do not remove these three dependencies since they are necessary for any Apex application. You can, however, exclude transitive dependencies from malhar-library to reduce the size of your @@ -191,14 +191,14 @@ as name value pairs, in XML format, like the following. Application attributes are used to specify the platform behavior for the application. They can be specified using the parameter -```dt.attr.```. The prefix “dt” is a constant, “attr” is a +```apex.attr.```. The prefix “apex” is a constant, “attr” is a constant denoting an attribute is being specified and `````` specifies the name of the attribute. Below is an example snippet setting the streaming windows size of the application to be 1000 milliseconds. ``` - dt.attr.STREAMING_WINDOW_SIZE_MILLIS + apex.attr.STREAMING_WINDOW_SIZE_MILLIS 1000 ``` @@ -213,7 +213,7 @@ be specified in the format described above. Operator attributes are used to specify the platform behavior for the operator. They can be specified using the parameter -```dt.operator..attr.```. The prefix “dt” is a +```apex.operator..attr.```. The prefix “apex” is a constant, “operator” is a constant denoting that an operator is being specified, `````` denotes the name of the operator, “attr” is the constant denoting that an attribute is being specified and @@ -225,7 +225,7 @@ application window of an operator named “input” to be 10 ``` - dt.operator.input.attr.APPLICATION_WINDOW_COUNT + apex.operator.input.attr.APPLICATION_WINDOW_COUNT 10 ``` @@ -240,7 +240,7 @@ can be specified in the format described above. Operators can be configured using operator specific properties. The properties can be specified using the parameter -```dt.operator..prop.```. The difference +```apex.operator..prop.```. The difference between this and the operator attribute specification described above is that the keyword “prop” is used to denote that it is a property and `````` specifies the property name. An example illustrating @@ -249,7 +249,7 @@ redis server for a “redis” output operator. ``` - dt.operator.redis.prop.host + apex.operator.redis.prop.host 127.0.0.1 ``` @@ -265,8 +265,8 @@ will be called on the “redis” operator with “127.0.0.1” as the argument. ### Port attributes Port attributes are used to specify the platform behavior for input and -output ports. They can be specified using the parameter ```dt.operator..inputport..attr.``` -for input port and ```dt.operator..outputport..attr.``` +output ports. They can be specified using the parameter ```apex.operator..inputport..attr.``` +for input port and ```apex.operator..outputport..attr.``` for output port. The keyword “inputport” is used to denote an input port and “outputport” to denote an output port. The rest of the specification follows the conventions described in other specifications above. An @@ -276,7 +276,7 @@ be 4k. ``` - dt.operator.range.inputport.input.attr.QUEUE_CAPACITY + apex.operator.range.inputport.input.attr.QUEUE_CAPACITY 4000 ``` @@ -297,7 +297,7 @@ specification described below. Streams can be configured using stream properties. The properties can be specified using the parameter -```dt.stream..prop.``` The constant “stream” +```apex.stream..prop.``` The constant “stream” specifies that it is a stream, `````` specifies the name of the stream and `````` the name of the property. The name of the stream is the same name that is passed when the stream is added to the @@ -308,7 +308,7 @@ connecting be run in the same container. ``` - dt.stream.stream1.prop.locality + apex.stream.stream1.prop.locality CONTAINER_LOCAL ``` @@ -339,7 +339,7 @@ example, to specify an attribute for all ports of an operator it can be done as follows ``` - dt.operator.range.port.*.attr.QUEUE_CAPACITY + apex.operator.range.port.*.attr.QUEUE_CAPACITY 4000 ``` @@ -372,13 +372,13 @@ project. The properties.xml may look like: The name of an application-specific property takes the form of: -```dt.operator.{opName}.prop.{propName} ``` +```apex.operator.{opName}.prop.{propName} ``` The first represents the property with name propName of operator opName. Or you can set the application name at run time by setting this property: - dt.attr.APPLICATION_NAME + apex.attr.APPLICATION_NAME In this example, property some_name_1 is a required property which @@ -531,12 +531,12 @@ additional files to be packaged, you can use an Apex Configuration Package. ## Creating Configuration Packages Creating Configuration Packages is similar to creating Application Packages. You can create a configuration -package project using Maven by running the following command. Replace "com.example", "mydtconfig" and "1.0-SNAPSHOT" with the appropriate values: +package project using Maven by running the following command. Replace "com.example", "myconfig" and "1.0-SNAPSHOT" with the appropriate values: ``` $ mvn archetype:generate -DarchetypeGroupId=org.apache.apex \ -DarchetypeArtifactId=apex-conf-archetype -DarchetypeVersion=3.4.0 \ - -DgroupId=com.example -Dpackage=com.example.mydtconfig -DartifactId=mydtconfig \ + -DgroupId=com.example -Dpackage=com.example.myconfig -DartifactId=myconfig \ -Dversion=1.0-SNAPSHOT ``` @@ -547,7 +547,7 @@ $ mvn package ``` The "mvn package" command creates the Config Package file in target -directory as target/mydtconfig.apc. You will be able to use that +directory as target/myconfig.apc. You will be able to use that Configuration Package file to launch an Apache Apex application. ## Assembling your own configuration package @@ -568,7 +568,7 @@ Example: ```xml com.example 1.0.0 - mydtconf + myconfig jar My Apex Application Configuration @@ -661,7 +661,7 @@ files `-conf` option of the launch command in CLI supports specifying configuration package in the local filesystem. Example: - dt\> launch mydtapp-1.0.0.apa -conf mydtconfig.apc + apex\> launch myapp-1.0.0.apa -conf myconfig.apc This command expects both the application package and the configuration package to be in the local file system. http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java index 77959ab..9a7b128 100644 --- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java +++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java @@ -92,7 +92,6 @@ import org.apache.tools.ant.DirectoryScanner; import com.google.common.base.Preconditions; import com.sun.jersey.api.client.WebResource; -import com.datatorrent.api.Context; import com.datatorrent.api.DAG.GenericOperator; import com.datatorrent.api.Operator; import com.datatorrent.api.StreamingApplication; @@ -112,6 +111,7 @@ import com.datatorrent.stram.client.StramClientUtils; import com.datatorrent.stram.client.StramClientUtils.ClientRMHelper; import com.datatorrent.stram.codec.LogicalPlanSerializer; import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration; import com.datatorrent.stram.plan.logical.requests.AddStreamSinkRequest; import com.datatorrent.stram.plan.logical.requests.CreateOperatorRequest; import com.datatorrent.stram.plan.logical.requests.CreateStreamRequest; @@ -2108,13 +2108,11 @@ public class ApexCli if (appFactory != null) { if (!commandLineInfo.localMode) { - // see whether there is an app with the same name and user name running - String appNameAttributeName = StreamingApplication.DT_PREFIX + Context.DAGContext.APPLICATION_NAME.getName(); - String appName = config.get(appNameAttributeName, appFactory.getName()); + String appName = config.get(LogicalPlanConfiguration.KEY_APPLICATION_NAME, appFactory.getName()); ApplicationReport duplicateApp = StramClientUtils.getStartedAppInstanceByName(yarnClient, appName, UserGroupInformation.getLoginUser().getUserName(), null); if (duplicateApp != null) { - throw new CliException("Application with the name \"" + duplicateApp.getName() + "\" already running under the current user \"" + duplicateApp.getUser() + "\". Please choose another name. You can change the name by setting " + appNameAttributeName); + throw new CliException("Application with the name \"" + duplicateApp.getName() + "\" already running under the current user \"" + duplicateApp.getUser() + "\". Please choose another name. You can change the name by setting " + LogicalPlanConfiguration.KEY_APPLICATION_NAME); } // This is for suppressing System.out printouts from applications so that the user of CLI will not be confused by those printouts @@ -3788,9 +3786,11 @@ public class ApexCli while (it.hasNext()) { Entry entry = it.next(); // filter relevant entries - if (entry.getKey().startsWith(StreamingApplication.DT_PREFIX)) { - launchProperties.set(entry.getKey(), entry.getValue(), Scope.TRANSIENT, null); - requiredProperties.remove(entry.getKey()); + String key = entry.getKey(); + if (key.startsWith(StreamingApplication.DT_PREFIX) + || key.startsWith(StreamingApplication.APEX_PREFIX)) { + launchProperties.set(key, entry.getValue(), Scope.TRANSIENT, null); + requiredProperties.remove(key); } } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java index ffe33f3..5a9030e 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java @@ -112,14 +112,14 @@ public class LogicalPlanConfiguration public static final String GATEWAY_PREFIX = StreamingApplication.DT_PREFIX + "gateway."; public static final String GATEWAY_LISTEN_ADDRESS = GATEWAY_PREFIX + "listenAddress"; - public static final String STREAM_PREFIX = StreamingApplication.DT_PREFIX + "stream."; + public static final String STREAM_PREFIX = StreamingApplication.APEX_PREFIX + "stream."; public static final String STREAM_SOURCE = "source"; public static final String STREAM_SINKS = "sinks"; public static final String STREAM_TEMPLATE = "template"; public static final String STREAM_LOCALITY = "locality"; public static final String STREAM_SCHEMA = "schema"; - public static final String OPERATOR_PREFIX = StreamingApplication.DT_PREFIX + "operator."; + public static final String OPERATOR_PREFIX = StreamingApplication.APEX_PREFIX + "operator."; public static final String OPERATOR_CLASSNAME = "classname"; public static final String OPERATOR_TEMPLATE = "template"; @@ -144,6 +144,16 @@ public class LogicalPlanConfiguration LOG.debug("Initialized attributes {}", serial); } + public static final String KEY_APPLICATION_NAME = keyAndDeprecation(Context.DAGContext.APPLICATION_NAME); + public static final String KEY_GATEWAY_CONNECT_ADDRESS = keyAndDeprecation(Context.DAGContext.GATEWAY_CONNECT_ADDRESS); + + private static String keyAndDeprecation(Attribute attr) + { + String key = StreamingApplication.APEX_PREFIX + attr.getName(); + Configuration.addDeprecation(StreamingApplication.DT_PREFIX + attr.getName(), key); + return key; + } + private final DAGSetupPluginManager pluginManager; /** @@ -286,7 +296,7 @@ public class LogicalPlanConfiguration ambiguousAttributes.addAll(childElement.getAmbiguousAttributes()); @SuppressWarnings("unchecked") - Set intersection = (Set)Sets.newHashSet(CollectionUtils.intersection(allChildAttributes, allAttributes)); + Set intersection = Sets.newHashSet(CollectionUtils.intersection(allChildAttributes, allAttributes)); ambiguousAttributes.addAll(intersection); allChildAttributes.addAll(allAttributes); } @@ -1661,19 +1671,16 @@ public class LogicalPlanConfiguration */ public final void addFromConfiguration(Configuration conf) { - addFromProperties(toProperties(conf, StreamingApplication.DT_PREFIX), null); + addFromProperties(toProperties(conf), null); } - public static Properties toProperties(Configuration conf, String prefix) + private static Properties toProperties(Configuration conf) { Iterator> it = conf.iterator(); Properties props = new Properties(); while (it.hasNext()) { Entry e = it.next(); - // filter relevant entries - if (e.getKey().startsWith(prefix)) { - props.put(e.getKey(), e.getValue()); - } + props.put(e.getKey(), e.getValue()); } return props; } @@ -1713,7 +1720,7 @@ public class LogicalPlanConfiguration JSONArray operatorArray = json.getJSONArray("operators"); for (int i = 0; i < operatorArray.length(); i++) { JSONObject operator = operatorArray.getJSONObject(i); - String operatorPrefix = StreamingApplication.DT_PREFIX + StramElement.OPERATOR.getValue() + KEY_SEPARATOR + operator.getString("name") + "."; + String operatorPrefix = StreamingApplication.APEX_PREFIX + StramElement.OPERATOR.getValue() + KEY_SEPARATOR + operator.getString("name") + "."; prop.setProperty(operatorPrefix + "classname", operator.getString("class")); JSONObject operatorProperties = operator.optJSONObject("properties"); if (operatorProperties != null) { @@ -1756,7 +1763,7 @@ public class LogicalPlanConfiguration JSONObject appAttributes = json.optJSONObject("attributes"); if (appAttributes != null) { - String attributesPrefix = StreamingApplication.DT_PREFIX + StramElement.ATTR.getValue() + KEY_SEPARATOR; + String attributesPrefix = StreamingApplication.APEX_PREFIX + StramElement.ATTR.getValue() + KEY_SEPARATOR; @SuppressWarnings("unchecked") Iterator iter = appAttributes.keys(); while (iter.hasNext()) { @@ -1769,7 +1776,7 @@ public class LogicalPlanConfiguration for (int i = 0; i < streamArray.length(); i++) { JSONObject stream = streamArray.getJSONObject(i); String name = stream.optString("name", "stream-" + i); - String streamPrefix = StreamingApplication.DT_PREFIX + StramElement.STREAM.getValue() + KEY_SEPARATOR + name + KEY_SEPARATOR; + String streamPrefix = StreamingApplication.APEX_PREFIX + StramElement.STREAM.getValue() + KEY_SEPARATOR + name + KEY_SEPARATOR; JSONObject source = stream.getJSONObject("source"); prop.setProperty(streamPrefix + STREAM_SOURCE, source.getString("operatorName") + KEY_SEPARATOR + source.getString("portName")); JSONArray sinks = stream.getJSONArray("sinks"); @@ -1797,7 +1804,7 @@ public class LogicalPlanConfiguration /** - * Read node configurations from opProps. The opProps can be in any + * Read operator configurations from properties. The properties can be in any * random order, as long as they represent a consistent configuration in their * entirety. * @@ -1813,7 +1820,8 @@ public class LogicalPlanConfiguration for (final String propertyName : props.stringPropertyNames()) { String propertyValue = props.getProperty(propertyName); this.properties.setProperty(propertyName, propertyValue); - if (propertyName.startsWith(StreamingApplication.DT_PREFIX)) { + if (propertyName.startsWith(StreamingApplication.DT_PREFIX) || + propertyName.startsWith(StreamingApplication.APEX_PREFIX)) { String[] keyComps = propertyName.split(KEY_SEPARATOR_SPLIT_REGEX); parseStramPropertyTokens(keyComps, 1, propertyName, propertyValue, stramConf); } @@ -2239,14 +2247,14 @@ public class LogicalPlanConfiguration /** * Populate the logical plan from the streaming application definition and configuration. * Configuration is resolved based on application alias, if any. - * @param app The {@lin StreamingApplication} to be run. + * @param app The {@link StreamingApplication} to be run. * @param dag This will hold the {@link LogicalPlan} representation of the given {@link StreamingApplication}. * @param name The path of the application class in the jar. */ public void prepareDAG(LogicalPlan dag, StreamingApplication app, String name) { // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attribute below is used - String connectAddress = conf.get(StreamingApplication.DT_PREFIX + Context.DAGContext.GATEWAY_CONNECT_ADDRESS.getName()); + String connectAddress = conf.get(KEY_GATEWAY_CONNECT_ADDRESS); dag.setAttribute(Context.DAGContext.GATEWAY_CONNECT_ADDRESS, connectAddress == null ? conf.get(GATEWAY_LISTEN_ADDRESS) : connectAddress); DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf); if (app != null) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java index 3ec9882..ca85f5d 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java @@ -200,31 +200,25 @@ public class StramMiniClusterTest Properties dagProps = new Properties(); // input module (ensure shutdown works while windows are generated) - dagProps.put(StreamingApplication.DT_PREFIX + "operator.numGen.classname", TestGeneratorInputOperator.class.getName()); - dagProps.put(StreamingApplication.DT_PREFIX + "operator.numGen.maxTuples", "1"); + dagProps.put(StreamingApplication.APEX_PREFIX + "operator.numGen.classname", TestGeneratorInputOperator.class.getName()); + dagProps.put(StreamingApplication.APEX_PREFIX + "operator.numGen.maxTuples", "1"); - // fake output adapter - to be ignored when determine shutdown - //props.put(DAGContext.DT_PREFIX + "stream.output.classname", HDFSOutputStream.class.getName()); - //props.put(DAGContext.DT_PREFIX + "stream.output.inputNode", "module2"); - //props.put(DAGContext.DT_PREFIX + "stream.output.filepath", "miniclustertest-testSetupShutdown.out"); + dagProps.put(StreamingApplication.APEX_PREFIX + "operator.module1.classname", GenericTestOperator.class.getName()); - dagProps.put(StreamingApplication.DT_PREFIX + "operator.module1.classname", GenericTestOperator.class.getName()); + dagProps.put(StreamingApplication.APEX_PREFIX + "operator.module2.classname", GenericTestOperator.class.getName()); - dagProps.put(StreamingApplication.DT_PREFIX + "operator.module2.classname", GenericTestOperator.class.getName()); + dagProps.put(StreamingApplication.APEX_PREFIX + "stream.fromNumGen.source", "numGen.outport"); + dagProps.put(StreamingApplication.APEX_PREFIX + "stream.fromNumGen.sinks", "module1.inport1"); - dagProps.put(StreamingApplication.DT_PREFIX + "stream.fromNumGen.source", "numGen.outport"); - dagProps.put(StreamingApplication.DT_PREFIX + "stream.fromNumGen.sinks", "module1.inport1"); + dagProps.put(StreamingApplication.APEX_PREFIX + "stream.n1n2.source", "module1.outport1"); + dagProps.put(StreamingApplication.APEX_PREFIX + "stream.n1n2.sinks", "module2.inport1"); - dagProps.put(StreamingApplication.DT_PREFIX + "stream.n1n2.source", "module1.outport1"); - dagProps.put(StreamingApplication.DT_PREFIX + "stream.n1n2.sinks", "module2.inport1"); - - dagProps.setProperty(StreamingApplication.DT_PREFIX + LogicalPlan.MASTER_MEMORY_MB.getName(), "128"); - dagProps.setProperty(StreamingApplication.DT_PREFIX + LogicalPlan.CONTAINER_JVM_OPTIONS.getName(), "-Dlog4j.properties=custom_log4j.properties"); - dagProps.setProperty(StreamingApplication.DT_PREFIX + "operator.*." + OperatorContext.MEMORY_MB.getName(), "64"); - dagProps.setProperty(StreamingApplication.DT_PREFIX + "operator.*." + OperatorContext.VCORES.getName(), "1"); - dagProps.setProperty(StreamingApplication.DT_PREFIX + "operator.*.port.*." + Context.PortContext.BUFFER_MEMORY_MB.getName(), "32"); - dagProps.setProperty(StreamingApplication.DT_PREFIX + LogicalPlan.DEBUG.getName(), "true"); - //dagProps.setProperty(StreamingApplication.DT_PREFIX + LogicalPlan.CONTAINERS_MAX_COUNT.getName(), "2"); + dagProps.setProperty(StreamingApplication.APEX_PREFIX + LogicalPlan.MASTER_MEMORY_MB.getName(), "128"); + dagProps.setProperty(StreamingApplication.APEX_PREFIX + LogicalPlan.CONTAINER_JVM_OPTIONS.getName(), "-Dlog4j.properties=custom_log4j.properties"); + dagProps.setProperty(StreamingApplication.APEX_PREFIX + "operator.*." + OperatorContext.MEMORY_MB.getName(), "64"); + dagProps.setProperty(StreamingApplication.APEX_PREFIX + "operator.*." + OperatorContext.VCORES.getName(), "1"); + dagProps.setProperty(StreamingApplication.APEX_PREFIX + "operator.*.port.*." + Context.PortContext.BUFFER_MEMORY_MB.getName(), "32"); + dagProps.setProperty(StreamingApplication.APEX_PREFIX + LogicalPlan.DEBUG.getName(), "true"); LOG.info("dag properties: {}", dagProps); LOG.info("Initializing Client"); @@ -272,9 +266,9 @@ public class StramMiniClusterTest // single container topology of inline input and module Properties props = new Properties(); - props.put(StreamingApplication.DT_PREFIX + "stream.input.classname", TestGeneratorInputOperator.class.getName()); - props.put(StreamingApplication.DT_PREFIX + "stream.input.outputNode", "module1"); - props.put(StreamingApplication.DT_PREFIX + "module.module1.classname", GenericTestOperator.class.getName()); + props.put(StreamingApplication.APEX_PREFIX + "stream.input.classname", TestGeneratorInputOperator.class.getName()); + props.put(StreamingApplication.APEX_PREFIX + "stream.input.outputNode", "module1"); + props.put(StreamingApplication.APEX_PREFIX + "module.module1.classname", GenericTestOperator.class.getName()); LOG.info("Initializing Client"); LogicalPlanConfiguration tb = new LogicalPlanConfiguration(new Configuration(false)); http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java index 3999ace..d3240c5 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java @@ -113,7 +113,7 @@ public class LogicalPlanConfigurationTest } /** - * Test read from dt-site.xml in Hadoop configuration format. + * Test read from configuration file in Hadoop configuration format. */ @Test public void testLoadFromConfigXml() @@ -217,7 +217,7 @@ public class LogicalPlanConfigurationTest StreamMeta s1 = dag.getStream("n1n2"); assertNotNull(s1); - assertTrue("n1n2 inline", DAG.Locality.CONTAINER_LOCAL == s1.getLocality()); + assertTrue("n1n2 locality", DAG.Locality.CONTAINER_LOCAL == s1.getLocality()); OperatorMeta operator3 = dag.getOperatorMeta("operator3"); assertEquals("operator3.classname", GenericTestOperator.class, operator3.getOperator().getClass()); @@ -245,6 +245,46 @@ public class LogicalPlanConfigurationTest } @Test + public void testLoadFromPropertiesFileWithLegacyPrefix() throws IOException + { + Properties props = new Properties(); + String resourcePath = "/testTopologyLegacyPrefix.properties"; + InputStream is = this.getClass().getResourceAsStream(resourcePath); + if (is == null) { + fail("Could not load " + resourcePath); + } + props.load(is); + LogicalPlanConfiguration pb = new LogicalPlanConfiguration(new Configuration(false)).addFromProperties(props, null); + + LogicalPlan dag = new LogicalPlan(); + pb.populateDAG(dag); + dag.validate(); + + assertEquals("number of operators", 2, dag.getAllOperators().size()); + assertEquals("number of root operators", 1, dag.getRootOperators().size()); + + StreamMeta s1 = dag.getStream("s1"); + assertNotNull(s1); + assertTrue("s1 locality", DAG.Locality.CONTAINER_LOCAL == s1.getLocality()); + + OperatorMeta o2m = dag.getOperatorMeta("o2"); + assertEquals(GenericTestOperator.class, o2m.getOperator().getClass()); + GenericTestOperator o2 = (GenericTestOperator)o2m.getOperator(); + assertEquals("myStringProperty " + o2, "myStringPropertyValue", o2.getMyStringProperty()); + } + + @Test + public void testDeprecation() + { + String value = "bar"; + String oldKey = StreamingApplication.DT_PREFIX + Context.DAGContext.APPLICATION_NAME.getName(); + String newKey = LogicalPlanConfiguration.KEY_APPLICATION_NAME; + Configuration config = new Configuration(false); + config.set(oldKey, value); + Assert.assertEquals(value, config.get(newKey)); + } + + @Test public void testLoadFromJson() throws Exception { String resourcePath = "/testTopology.json"; @@ -258,7 +298,7 @@ public class LogicalPlanConfigurationTest JSONObject json = new JSONObject(writer.toString()); Configuration conf = new Configuration(false); - conf.set(StreamingApplication.DT_PREFIX + "operator.operator3.prop.myStringProperty", "o3StringFromConf"); + conf.set(StreamingApplication.APEX_PREFIX + "operator.operator3.prop.myStringProperty", "o3StringFromConf"); LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf); LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson"); @@ -319,11 +359,11 @@ public class LogicalPlanConfigurationTest String appName = "app1"; Properties props = new Properties(); - props.put(StreamingApplication.DT_PREFIX + DAG.MASTER_MEMORY_MB.getName(), "123"); - props.put(StreamingApplication.DT_PREFIX + DAG.CONTAINER_JVM_OPTIONS.getName(), "-Dlog4j.properties=custom_log4j.properties"); - props.put(StreamingApplication.DT_PREFIX + DAG.APPLICATION_PATH.getName(), "/defaultdir"); - props.put(StreamingApplication.DT_PREFIX + "application." + appName + "." + DAG.APPLICATION_PATH.getName(), "/otherdir"); - props.put(StreamingApplication.DT_PREFIX + "application." + appName + "." + DAG.STREAMING_WINDOW_SIZE_MILLIS.getName(), "1000"); + props.put(StreamingApplication.APEX_PREFIX + DAG.MASTER_MEMORY_MB.getName(), "123"); + props.put(StreamingApplication.APEX_PREFIX + DAG.CONTAINER_JVM_OPTIONS.getName(), "-Dlog4j.properties=custom_log4j.properties"); + props.put(StreamingApplication.APEX_PREFIX + DAG.APPLICATION_PATH.getName(), "/defaultdir"); + props.put(StreamingApplication.APEX_PREFIX + "application." + appName + "." + DAG.APPLICATION_PATH.getName(), "/otherdir"); + props.put(StreamingApplication.APEX_PREFIX + "application." + appName + "." + DAG.STREAMING_WINDOW_SIZE_MILLIS.getName(), "1000"); LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); dagBuilder.addFromProperties(props, null); @@ -347,10 +387,10 @@ public class LogicalPlanConfigurationTest { String appName = "app1"; Properties props = new Properties(); - props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".testprop1", "10"); - props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".prop.testprop2", "100"); - props.put(StreamingApplication.DT_PREFIX + "application.*.prop.testprop3", "1000"); - props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".inncls.a", "10000"); + props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".testprop1", "10"); + props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".prop.testprop2", "100"); + props.put(StreamingApplication.APEX_PREFIX + "application.*.prop.testprop3", "1000"); + props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".inncls.a", "10000"); LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); dagBuilder.addFromProperties(props, null); @@ -396,20 +436,20 @@ public class LogicalPlanConfigurationTest Properties props = new Properties(); // match operator by name - props.put(StreamingApplication.DT_PREFIX + "template.matchId1.matchIdRegExp", ".*operator1.*"); - props.put(StreamingApplication.DT_PREFIX + "template.matchId1.stringProperty2", "stringProperty2Value-matchId1"); - props.put(StreamingApplication.DT_PREFIX + "template.matchId1.nested.property", "nested.propertyValue-matchId1"); + props.put(StreamingApplication.APEX_PREFIX + "template.matchId1.matchIdRegExp", ".*operator1.*"); + props.put(StreamingApplication.APEX_PREFIX + "template.matchId1.stringProperty2", "stringProperty2Value-matchId1"); + props.put(StreamingApplication.APEX_PREFIX + "template.matchId1.nested.property", "nested.propertyValue-matchId1"); // match class name, lower priority - props.put(StreamingApplication.DT_PREFIX + "template.matchClass1.matchClassNameRegExp", ".*" + ValidationTestOperator.class.getSimpleName()); - props.put(StreamingApplication.DT_PREFIX + "template.matchClass1.stringProperty2", "stringProperty2Value-matchClass1"); + props.put(StreamingApplication.APEX_PREFIX + "template.matchClass1.matchClassNameRegExp", ".*" + ValidationTestOperator.class.getSimpleName()); + props.put(StreamingApplication.APEX_PREFIX + "template.matchClass1.stringProperty2", "stringProperty2Value-matchClass1"); // match class name - props.put(StreamingApplication.DT_PREFIX + "template.t2.matchClassNameRegExp", ".*" + GenericTestOperator.class.getSimpleName()); - props.put(StreamingApplication.DT_PREFIX + "template.t2.myStringProperty", "myStringPropertyValue"); + props.put(StreamingApplication.APEX_PREFIX + "template.t2.matchClassNameRegExp", ".*" + GenericTestOperator.class.getSimpleName()); + props.put(StreamingApplication.APEX_PREFIX + "template.t2.myStringProperty", "myStringPropertyValue"); // direct setting - props.put(StreamingApplication.DT_PREFIX + "operator.operator3.emitFormat", "emitFormatValue"); + props.put(StreamingApplication.APEX_PREFIX + "operator.operator3.emitFormat", "emitFormatValue"); LogicalPlan dag = new LogicalPlan(); Operator operator1 = dag.addOperator("operator1", new ValidationTestOperator()); @@ -441,11 +481,11 @@ public class LogicalPlanConfigurationTest { Configuration conf = new Configuration(false); - conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.myStringProperty", "myStringPropertyValue"); - conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.stringArrayField", "a,b,c"); - conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty.key1", "key1Val"); - conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty(key1.dot)", "key1dotVal"); - conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty(key2.dot)", "key2dotVal"); + conf.set(StreamingApplication.APEX_PREFIX + "operator.o1.prop.myStringProperty", "myStringPropertyValue"); + conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.stringArrayField", "a,b,c"); + conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty.key1", "key1Val"); + conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty(key1.dot)", "key1dotVal"); + conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty(key2.dot)", "key2dotVal"); LogicalPlan dag = new LogicalPlan(); GenericTestOperator o1 = dag.addOperator("o1", new GenericTestOperator()); @@ -484,7 +524,7 @@ public class LogicalPlanConfigurationTest LogicalPlanConfiguration builder = new LogicalPlanConfiguration(conf); Properties properties = new Properties(); - properties.put(StreamingApplication.DT_PREFIX + "application.TestAliasApp.class", app.getClass().getName()); + properties.put(StreamingApplication.APEX_PREFIX + "application.TestAliasApp.class", app.getClass().getName()); builder.addFromProperties(properties, null); @@ -506,7 +546,7 @@ public class LogicalPlanConfigurationTest LogicalPlanConfiguration builder = new LogicalPlanConfiguration(conf); Properties properties = new Properties(); - properties.put(StreamingApplication.DT_PREFIX + "application.TestAliasApp.class", app.getClass().getName()); + properties.put(StreamingApplication.APEX_PREFIX + "application.TestAliasApp.class", app.getClass().getName()); builder.addFromProperties(properties, null); @@ -549,10 +589,10 @@ public class LogicalPlanConfigurationTest }; Properties props = new Properties(); - props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName()); - props.put(StreamingApplication.DT_PREFIX + "operator.*." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "2"); - props.put(StreamingApplication.DT_PREFIX + "operator.*." + OperatorContext.STATS_LISTENERS.getName(), PartitionLoadWatch.class.getName()); - props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "20"); + props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".class", app.getClass().getName()); + props.put(StreamingApplication.APEX_PREFIX + "operator.*." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "2"); + props.put(StreamingApplication.APEX_PREFIX + "operator.*." + OperatorContext.STATS_LISTENERS.getName(), PartitionLoadWatch.class.getName()); + props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".operator.operator1." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "20"); LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); dagBuilder.addFromProperties(props, null); @@ -586,9 +626,9 @@ public class LogicalPlanConfigurationTest }; Properties props = new Properties(); - props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName()); - props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.outputport.outport1.unifier." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "2"); - props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.outputport.outport1.unifier." + OperatorContext.MEMORY_MB.getName(), "512"); + props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".class", app.getClass().getName()); + props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".operator.operator1.outputport.outport1.unifier." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "2"); + props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".operator.operator1.outputport.outport1.unifier." + OperatorContext.MEMORY_MB.getName(), "512"); LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); dagBuilder.addFromProperties(props, null); @@ -723,10 +763,10 @@ public class LogicalPlanConfigurationTest }; Properties props = new Properties(); - props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName()); - props.put(StreamingApplication.DT_PREFIX + "operator.*.myStringProperty", "pv1"); - props.put(StreamingApplication.DT_PREFIX + "operator.*.booleanProperty", Boolean.TRUE.toString()); - props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.myStringProperty", "apv1"); + props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".class", app.getClass().getName()); + props.put(StreamingApplication.APEX_PREFIX + "operator.*.myStringProperty", "pv1"); + props.put(StreamingApplication.APEX_PREFIX + "operator.*.booleanProperty", Boolean.TRUE.toString()); + props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".operator.operator1.myStringProperty", "apv1"); LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); dagBuilder.addFromProperties(props, null); @@ -758,10 +798,10 @@ public class LogicalPlanConfigurationTest }; Properties props = new Properties(); - props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName()); - props.put(StreamingApplication.DT_PREFIX + "operator.*.myStringProperty", "foo ${xyz} bar ${zzz} baz"); - props.put(StreamingApplication.DT_PREFIX + "operator.*.booleanProperty", Boolean.TRUE.toString()); - props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.myStringProperty", "apv1"); + props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".class", app.getClass().getName()); + props.put(StreamingApplication.APEX_PREFIX + "operator.*.myStringProperty", "foo ${xyz} bar ${zzz} baz"); + props.put(StreamingApplication.APEX_PREFIX + "operator.*.booleanProperty", Boolean.TRUE.toString()); + props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".operator.operator1.myStringProperty", "apv1"); LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); @@ -788,12 +828,12 @@ public class LogicalPlanConfigurationTest SimpleTestApplication app = new SimpleTestApplication(); Properties props = new Properties(); - props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName()); - props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.port.*." + PortContext.QUEUE_CAPACITY.getName(), "" + 16 * 1024); - props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator2.inputport.inport1." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024); - props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator2.outputport.outport1." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024); - props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator3.port.*." + PortContext.QUEUE_CAPACITY.getName(), "" + 16 * 1024); - props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator3.inputport.inport2." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024); + props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".class", app.getClass().getName()); + props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".operator.operator1.port.*." + PortContext.QUEUE_CAPACITY.getName(), "" + 16 * 1024); + props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".operator.operator2.inputport.inport1." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024); + props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".operator.operator2.outputport.outport1." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024); + props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".operator.operator3.port.*." + PortContext.QUEUE_CAPACITY.getName(), "" + 16 * 1024); + props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".operator.operator3.inputport.inport2." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024); LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); dagBuilder.addFromProperties(props, null); @@ -831,7 +871,7 @@ public class LogicalPlanConfigurationTest // attribute that cannot be configured Properties props = new Properties(); - props.put(StreamingApplication.DT_PREFIX + "attr.NOT_CONFIGURABLE", "value"); + props.put(StreamingApplication.APEX_PREFIX + "attr.NOT_CONFIGURABLE", "value"); LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); dagBuilder.addFromProperties(props, null); @@ -849,7 +889,7 @@ public class LogicalPlanConfigurationTest // invalid attribute name props = new Properties(); - String invalidAttribute = StreamingApplication.DT_PREFIX + "attr.INVALID_NAME"; + String invalidAttribute = StreamingApplication.APEX_PREFIX + "attr.INVALID_NAME"; props.put(invalidAttribute, "value"); try { @@ -930,7 +970,7 @@ public class LogicalPlanConfigurationTest public void testTestTupleClassAttrSetFromConfig() { Configuration conf = new Configuration(false); - conf.set(StreamingApplication.DT_PREFIX + "operator.o2.port.schemaRequiredPort.attr.TUPLE_CLASS", + conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.port.schemaRequiredPort.attr.TUPLE_CLASS", "com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest$TestSchema"); StreamingApplication streamingApplication = new StreamingApplication() @@ -986,7 +1026,7 @@ public class LogicalPlanConfigurationTest } Properties props = new Properties(); - String propName = StreamingApplication.DT_PREFIX + StramElement.ATTR.getValue() + LogicalPlanConfiguration.KEY_SEPARATOR + attributeName; + String propName = StreamingApplication.APEX_PREFIX + StramElement.ATTR.getValue() + LogicalPlanConfiguration.KEY_SEPARATOR + attributeName; props.put(propName, "5"); SimpleTestApplicationWithName app = new SimpleTestApplicationWithName(); @@ -1015,7 +1055,7 @@ public class LogicalPlanConfigurationTest public void testRootLevelAmbiguousAttributeSimple() { testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, - StreamingApplication.DT_PREFIX, null, Boolean.TRUE, true, true); + StreamingApplication.APEX_PREFIX, null, Boolean.TRUE, true, true); } /** @@ -1025,7 +1065,7 @@ public class LogicalPlanConfigurationTest public void testApplicationLevelAmbiguousAttributeSimple() { testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, - StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, null, Boolean.TRUE, true, true); + StreamingApplication.APEX_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, null, Boolean.TRUE, true, true); } /** @@ -1035,7 +1075,7 @@ public class LogicalPlanConfigurationTest public void testOperatorLevelAmbiguousAttributeSimple() { testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, - StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, null, Boolean.TRUE, true, false); + StreamingApplication.APEX_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, null, Boolean.TRUE, true, false); } /** @@ -1045,7 +1085,7 @@ public class LogicalPlanConfigurationTest public void testPortLevelAmbiguousAttributeSimple() { testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, - StreamingApplication.DT_PREFIX + "port" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, null, Boolean.TRUE, false, true); + StreamingApplication.APEX_PREFIX + "port" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, null, Boolean.TRUE, false, true); } /** @@ -1054,7 +1094,7 @@ public class LogicalPlanConfigurationTest @Test public void testRootLevelAmbiguousAttributeComplex() { - testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, StreamingApplication.DT_PREFIX, + testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, StreamingApplication.APEX_PREFIX, PortContext.class.getCanonicalName(), Boolean.TRUE, false, true); } @@ -1065,7 +1105,7 @@ public class LogicalPlanConfigurationTest public void testApplicationLevelAmbiguousAttributeComplex() { testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, - StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, PortContext.class.getCanonicalName(), + StreamingApplication.APEX_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, PortContext.class.getCanonicalName(), Boolean.TRUE, false, true); } @@ -1076,7 +1116,7 @@ public class LogicalPlanConfigurationTest public void testOperatorLevelAmbiguousAttributeComplex() { testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, - StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, OperatorContext.class.getCanonicalName(), + StreamingApplication.APEX_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, OperatorContext.class.getCanonicalName(), Boolean.TRUE, true, false); } @@ -1087,7 +1127,7 @@ public class LogicalPlanConfigurationTest public void testOperatorLevelAmbiguousAttributeComplex2() { testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, - StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, PortContext.class.getCanonicalName(), + StreamingApplication.APEX_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, PortContext.class.getCanonicalName(), Boolean.TRUE, false, true); } @@ -1098,7 +1138,7 @@ public class LogicalPlanConfigurationTest public void testPortLevelAmbiguousAttributeComplex() { testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, - StreamingApplication.DT_PREFIX + "port" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, PortContext.class.getCanonicalName(), + StreamingApplication.APEX_PREFIX + "port" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, PortContext.class.getCanonicalName(), Boolean.TRUE, false, true); } @@ -1116,7 +1156,7 @@ public class LogicalPlanConfigurationTest @Test public void testRootLevelAttributeSimpleNameOperator() { - simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, StreamingApplication.DT_PREFIX, true, 4096, true, true); + simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, StreamingApplication.APEX_PREFIX, true, 4096, true, true); } @Test @@ -1124,19 +1164,19 @@ public class LogicalPlanConfigurationTest { MockStorageAgent mockAgent = new MockStorageAgent(); - simpleAttributeOperatorHelper(OperatorContext.STORAGE_AGENT, StreamingApplication.DT_PREFIX, true, mockAgent, true, false); + simpleAttributeOperatorHelper(OperatorContext.STORAGE_AGENT, StreamingApplication.APEX_PREFIX, true, mockAgent, true, false); } @Test public void testRootLevelAttributeSimpleNameOperatorNoScope() { - simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, StreamingApplication.DT_PREFIX, true, 4096, true, false); + simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, StreamingApplication.APEX_PREFIX, true, 4096, true, false); } @Test public void testApplicationLevelAttributeSimpleNameOperator() { - simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, + simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, StreamingApplication.APEX_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, true, 4096, true, true); } @@ -1190,137 +1230,137 @@ public class LogicalPlanConfigurationTest @Test public void testRootLevelAttributeSimpleNamePort() { - simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, - true, (Integer)4096, true, true); + simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX, + true, 4096, true, true); } @Test public void testRootLevelAttributeSimpleNamePortNoScope() { - simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, - true, (Integer)4096, true, false); + simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX, + true, 4096, true, false); } @Test public void testOperatorLevelAttributeSimpleNamePort() { - simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, - true, (Integer)4096, true, true); + simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, + true, 4096, true, true); } @Test public void testApplicationLevelAttributeSimpleNamePort() { - simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, - true, (Integer)4096, true, true); + simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, + true, 4096, true, true); } @Test public void testRootLevelAttributeComplexNamePort() { - simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, false, - (Integer)4096, true, true); + simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX, false, + 4096, true, true); } @Test public void testRootLevelAttributeComplexNamePortNoScope() { - simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, false, - (Integer)4096, true, false); + simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX, false, + 4096, true, false); } @Test public void testOperatorLevelAttributeComplexNamePort() { - simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, - false, (Integer)4096, true, true); + simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, + false, 4096, true, true); } @Test public void testApplicationLevelAttributeComplexNamePort() { - simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, - false, (Integer)4096, true, true); + simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, + false, 4096, true, true); } /* Input port tests */ @Test public void testRootLevelAttributeSimpleNameInputPort() { - simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, true, - (Integer)4096, true); + simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX, true, + 4096, true); } @Test public void testOperatorLevelAttributeSimpleNameInputPort() { - simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, true, (Integer)4096, true); + simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, true, 4096, true); } @Test public void testApplicationLevelAttributeSimpleNameInputPort() { - simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, - true, (Integer)4096, true); + simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, + true, 4096, true); } @Test public void testRootLevelAttributeComplexNameInputPort() { - simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, false, (Integer)4096, true); + simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX, false, 4096, true); } @Test public void testOperatorLevelAttributeComplexNameInputPort() { - simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, false, - (Integer)4096, true); + simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, false, + 4096, true); } @Test public void testApplicationLevelAttributeComplexNameInputPort() { - simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, - false, (Integer)4096, true); + simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, + false, 4096, true); } /* Output port tests */ @Test public void testRootLevelAttributeSimpleNameOutputPort() { - simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, true, (Integer)4096, true); + simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX, true, 4096, true); } @Test public void testOperatorLevelAttributeSimpleNameOutputPort() { - simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, true, (Integer)4096, true); + simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, true, 4096, true); } @Test public void testApplicationLevelAttributeSimpleNameOutputPort() { - simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + - "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, true, (Integer)4096, true); + simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, true, 4096, true); } @Test public void testRootLevelAttributeComplexNameOutputPort() { - simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, false, (Integer)4096, true); + simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX, false, 4096, true); } @Test public void testOperatorLevelAttributeComplexNameOutputPort() { - simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, false, (Integer)4096, true); + simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, false, 4096, true); } @Test public void testApplicationLevelAttributeComplexNameOutputPort() { - simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + - "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, false, (Integer)4096, true); + simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.APEX_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, false, 4096, true); } /* Helpers for building ports */ http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java index 7759363..6747fd7 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java @@ -709,7 +709,7 @@ public class TestModuleExpansion JSONObject json = new JSONObject(writer.toString()); Configuration conf = new Configuration(false); - conf.set(StreamingApplication.DT_PREFIX + "operator.operator3.prop.myStringProperty", "o3StringFromConf"); + conf.set(StreamingApplication.APEX_PREFIX + "operator.operator3.prop.myStringProperty", "o3StringFromConf"); LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf); LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson"); http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleProperties.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleProperties.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleProperties.java index 7951e26..a556235 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleProperties.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleProperties.java @@ -33,11 +33,11 @@ public class TestModuleProperties public void testModuleProperties() { Configuration conf = new Configuration(false); - conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.myStringProperty", "myStringPropertyValue"); - conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.stringArrayField", "a,b,c"); - conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty.key1", "key1Val"); - conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty(key1.dot)", "key1dotVal"); - conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty(key2.dot)", "key2dotVal"); + conf.set(StreamingApplication.APEX_PREFIX + "operator.o1.prop.myStringProperty", "myStringPropertyValue"); + conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.stringArrayField", "a,b,c"); + conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty.key1", "key1Val"); + conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty(key1.dot)", "key1dotVal"); + conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty(key2.dot)", "key2dotVal"); LogicalPlan dag = new LogicalPlan(); TestModules.GenericModule o1 = dag.addModule("o1", new TestModules.GenericModule()); http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/engine/src/test/resources/clusterTest.app.properties ---------------------------------------------------------------------- diff --git a/engine/src/test/resources/clusterTest.app.properties b/engine/src/test/resources/clusterTest.app.properties deleted file mode 100644 index 2a0b292..0000000 --- a/engine/src/test/resources/clusterTest.app.properties +++ /dev/null @@ -1,44 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# run number generator with pass through operator on cluster continuously - -#template for operator definition -dt.template.nt1.classname=com.datatorrent.dt.engine.GenericTestOperator -#dt.template.nt1.logMessages=true - -dt.template.streamtempl1.inline=true - -dt.operator.operator1.template=nt1 -dt.operator.operator2.template=nt1 - -dt.operator.inputOperator.classname=com.datatorrent.dt.engine.TestGeneratorInputOperator -dt.operator.inputOperator.maxTuples=10 - -#dt.operator.httpOut.classname=com.datatorrent.lib.io.HttpOutputOperator -#dt.operator.httpOut.resourceUrl=http://localhost:9999/resourcecontext - -dt.stream.input1.source=inputOperator.outputPort -dt.stream.input1.sinks=operator1.input1 -dt.stream.input1.template=streamtempl1 - -dt.stream.n1n2.source=operator1.output1 -#dt.stream.n1n2.sinks=operator2.input1,httpOut.input -dt.stream.n1n2.sinks=operator2.input1 -dt.stream.n1n2.template=streamtempl1 http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/engine/src/test/resources/dt-site.xml ---------------------------------------------------------------------- diff --git a/engine/src/test/resources/dt-site.xml b/engine/src/test/resources/dt-site.xml index 20748e2..e3cffbf 100644 --- a/engine/src/test/resources/dt-site.xml +++ b/engine/src/test/resources/dt-site.xml @@ -34,14 +34,14 @@ - dt.operator.operator1.classname + apex.operator.operator1.classname com.datatorrent.stram.engine.TestGeneratorInputOperator The root operator - dt.operator.operator1.myStringProperty + apex.operator.operator1.myStringProperty myStringPropertyValue subclass specific property @@ -49,7 +49,7 @@ - dt.operator.operator2.classname + apex.operator.operator2.classname com.datatorrent.stram.engine.GenericTestOperator Another operator, which gets input from root @@ -57,19 +57,19 @@ - dt.stream.n1n2.source + apex.stream.n1n2.source operator1.outport - dt.stream.n1n2.sinks + apex.stream.n1n2.sinks operator2.inport1 - dt.stream.n1n2.partitionPolicy + apex.stream.n1n2.partitionPolicy someTargetPolicy The partition policy for this stream. @@ -77,7 +77,7 @@ - dt.operator.operator3.classname + apex.operator.operator3.classname com.datatorrent.stram.engine.GenericTestOperator Another operator, which gets input from root @@ -85,19 +85,19 @@ - dt.operator.operator4.classname + apex.operator.operator4.classname com.datatorrent.stram.engine.GenericTestOperator Another operator, which gets input from root - dt.stream.n2n3.source + apex.stream.n2n3.source operator2.outport1 Operator 3 receives input from operator 2. - dt.stream.n2n3.sinks + apex.stream.n2n3.sinks operator3.inport1,operator4.inport1 operator3 and operator4 receives input from operator 2. @@ -105,18 +105,18 @@ - dt.operator.operator5.classname + apex.operator.operator5.classname com.datatorrent.stram.engine.GenericTestOperator - dt.stream.n4n5.source + apex.stream.n4n5.source operator4.outport1 Operator 5 receives input from operator 4. - dt.stream.n4n5.sinks + apex.stream.n4n5.sinks operator5.inport1 Operator 5 receives input from operator 4. @@ -125,7 +125,7 @@ - dt.operator.operator6.classname + apex.operator.operator6.classname com.datatorrent.stram.engine.TestGeneratorInputOperator Another operator, which gets input from root http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/engine/src/test/resources/testModuleTopology.properties ---------------------------------------------------------------------- diff --git a/engine/src/test/resources/testModuleTopology.properties b/engine/src/test/resources/testModuleTopology.properties index 0679e26..8a67f36 100644 --- a/engine/src/test/resources/testModuleTopology.properties +++ b/engine/src/test/resources/testModuleTopology.properties @@ -18,45 +18,45 @@ # # test for defining topology as property file -dt.operator.O1.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyInputOperator -dt.operator.O1.inputOperatorProp=1 +apex.operator.O1.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyInputOperator +apex.operator.O1.inputOperatorProp=1 -dt.operator.O2.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyOperator -dt.operator.O2.operatorProp=2 +apex.operator.O2.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyOperator +apex.operator.O2.operatorProp=2 -dt.operator.Ma.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA -dt.operator.Ma.level2ModuleAProp1=11 -dt.operator.Ma.level2ModuleAProp2=12 -dt.operator.Ma.level2ModuleAProp3=13 +apex.operator.Ma.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA +apex.operator.Ma.level2ModuleAProp1=11 +apex.operator.Ma.level2ModuleAProp2=12 +apex.operator.Ma.level2ModuleAProp3=13 -dt.operator.Mb.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB -dt.operator.Mb.level2ModuleBProp1=21 -dt.operator.Mb.level2ModuleBProp2=22 -dt.operator.Mb.level2ModuleBProp3=23 +apex.operator.Mb.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB +apex.operator.Mb.level2ModuleBProp1=21 +apex.operator.Mb.level2ModuleBProp2=22 +apex.operator.Mb.level2ModuleBProp3=23 -dt.operator.Mc.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA -dt.operator.Mc.level2ModuleAProp1=31 -dt.operator.Mc.level2ModuleAProp2=32 -dt.operator.Mc.level2ModuleAProp3=33 +apex.operator.Mc.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA +apex.operator.Mc.level2ModuleAProp1=31 +apex.operator.Mc.level2ModuleAProp2=32 +apex.operator.Mc.level2ModuleAProp3=33 -dt.operator.Md.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB -dt.operator.Md.level2ModuleBProp1=41 -dt.operator.Md.level2ModuleBProp2=42 -dt.operator.Md.level2ModuleBProp3=43 +apex.operator.Md.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB +apex.operator.Md.level2ModuleBProp1=41 +apex.operator.Md.level2ModuleBProp2=42 +apex.operator.Md.level2ModuleBProp3=43 -dt.operator.Me.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level3Module +apex.operator.Me.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level3Module -dt.stream.O1_O2.source=O1.out -dt.stream.O1_O2.sinks=O2.in,Me.mIn +apex.stream.O1_O2.source=O1.out +apex.stream.O1_O2.sinks=O2.in,Me.mIn -dt.stream.O2_Ma.source=O2.out1 -dt.stream.O2_Ma.sinks=Ma.mIn +apex.stream.O2_Ma.source=O2.out1 +apex.stream.O2_Ma.sinks=Ma.mIn -dt.stream.Ma_Mb.source=Ma.mOut1 -dt.stream.Ma_Mb.sinks=Mb.mIn +apex.stream.Ma_Mb.source=Ma.mOut1 +apex.stream.Ma_Mb.sinks=Mb.mIn -dt.stream.Ma_Md.source=Ma.mOut2 -dt.stream.Ma_Md.sinks=Md.mIn +apex.stream.Ma_Md.source=Ma.mOut2 +apex.stream.Ma_Md.sinks=Md.mIn -dt.stream.Mb_Mc.source=Mb.mOut2 -dt.stream.Mb_Mc.sinks=Mc.mIn +apex.stream.Mb_Mc.source=Mb.mOut2 +apex.stream.Mb_Mc.sinks=Mc.mIn http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/engine/src/test/resources/testTopology.properties ---------------------------------------------------------------------- diff --git a/engine/src/test/resources/testTopology.properties b/engine/src/test/resources/testTopology.properties index abb7040..3160424 100644 --- a/engine/src/test/resources/testTopology.properties +++ b/engine/src/test/resources/testTopology.properties @@ -19,29 +19,29 @@ # test for defining topology as property file -dt.operator.inputOperator.classname=com.datatorrent.stram.engine.TestGeneratorInputOperator -dt.operator.inputOperator.myConfigProperty=myConfigPropertyValue +apex.operator.inputOperator.classname=com.datatorrent.stram.engine.TestGeneratorInputOperator +apex.operator.inputOperator.myConfigProperty=myConfigPropertyValue -dt.operator.operator1.classname=com.datatorrent.stram.engine.GenericTestOperator -dt.operator.operator1.myStringProperty=myStringPropertyValue +apex.operator.operator1.classname=com.datatorrent.stram.engine.GenericTestOperator +apex.operator.operator1.myStringProperty=myStringPropertyValue -dt.operator.operator2.classname=com.datatorrent.stram.engine.GenericTestOperator +apex.operator.operator2.classname=com.datatorrent.stram.engine.GenericTestOperator #define a template for operator definition -dt.template.nt1.classname=com.datatorrent.stram.engine.GenericTestOperator -dt.template.nt1.myStringProperty=myStringPropertyValueFromTemplate +apex.template.nt1.classname=com.datatorrent.stram.engine.GenericTestOperator +apex.template.nt1.myStringProperty=myStringPropertyValueFromTemplate -dt.operator.operator3.template=nt1 +apex.operator.operator3.template=nt1 -dt.operator.operator4.template=nt1 -dt.operator.operator4.myStringProperty=overrideOperator4 -dt.operator.operator4.stringPropertySetterOnly=setterOnlyOperator4 -dt.operator.operator4.booleanProperty=true +apex.operator.operator4.template=nt1 +apex.operator.operator4.myStringProperty=overrideOperator4 +apex.operator.operator4.stringPropertySetterOnly=setterOnlyOperator4 +apex.operator.operator4.booleanProperty=true -dt.stream.n1n2.source=operator1.outport1 -dt.stream.n1n2.sinks=operator2.inport1 -dt.stream.n1n2.template=defaultstream -dt.stream.n1n2.locality=CONTAINER_LOCAL +apex.stream.n1n2.source=operator1.outport1 +apex.stream.n1n2.sinks=operator2.inport1 +apex.stream.n1n2.template=defaultstream +apex.stream.n1n2.locality=CONTAINER_LOCAL -dt.stream.inputStream.source=inputOperator.outport -dt.stream.inputStream.sinks=operator1.inport1,operator3.inport1,operator4.inport1 +apex.stream.inputStream.source=inputOperator.outport +apex.stream.inputStream.sinks=operator1.inport1,operator3.inport1,operator4.inport1