apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vro...@apache.org
Subject [36/50] [abbrv] incubator-apex-core git commit: APEX-28 #resolve
Date Thu, 24 Sep 2015 02:37:55 GMT
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/977093e1/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
new file mode 100644
index 0000000..077e3a9
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
@@ -0,0 +1,1511 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.stram.plan.logical;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.lang.reflect.Field;
+
+import java.util.*;
+
+import javax.validation.ValidationException;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.*;
+import com.datatorrent.api.Attribute.AttributeMap;
+import com.datatorrent.api.Attribute.AttributeMap.AttributeInitializer;
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.StringCodec.Integer2String;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+import com.datatorrent.common.codec.JsonStreamCodec;
+import com.datatorrent.common.util.BasicContainerOptConfigurator;
+import com.datatorrent.stram.PartitioningTest.PartitionLoadWatch;
+import com.datatorrent.stram.client.StramClientUtils;
+import com.datatorrent.stram.engine.GenericTestOperator;
+import com.datatorrent.stram.engine.TestGeneratorInputOperator;
+import com.datatorrent.stram.plan.SchemaTestOperator;
+import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.AttributeParseUtils;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.ConfElement;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.ContextUtils;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.StramElement;
+import com.datatorrent.stram.plan.logical.LogicalPlanTest.ValidationTestOperator;
+import com.datatorrent.stram.support.StramTestSupport.RegexMatcher;
+
+public class LogicalPlanConfigurationTest {
+
+  static {
+    @SuppressWarnings("MismatchedReadAndWriteOfArray")
+    Object serial[] = new Object[] {MockContext1.serialVersionUID, MockContext2.serialVersionUID};
+  }
+
+  private static OperatorMeta assertNode(LogicalPlan dag, String id) {
+      OperatorMeta n = dag.getOperatorMeta(id);
+      assertNotNull("operator exists id=" + id, n);
+      return n;
+  }
+
+  /**
+   * Test read from dt-site.xml in Hadoop configuration format.
+   */
+  @Test
+  public void testLoadFromConfigXml() {
+    Configuration conf = new Configuration(false);
+    conf.addResource(StramClientUtils.DT_SITE_XML_FILE);
+
+    LogicalPlanConfiguration builder = new LogicalPlanConfiguration(conf);
+
+    LogicalPlan dag = new LogicalPlan();
+    builder.populateDAG(dag);
+    dag.validate();
+
+    assertEquals("number of operator confs", 6, dag.getAllOperators().size());
+
+    OperatorMeta operator1 = assertNode(dag, "operator1");
+    OperatorMeta operator2 = assertNode(dag, "operator2");
+    OperatorMeta operator3 = assertNode(dag, "operator3");
+    OperatorMeta operator4 = assertNode(dag, "operator4");
+
+    assertNotNull("operatorConf for root", operator1);
+    assertEquals("operatorId set", "operator1", operator1.getName());
+
+    // verify operator instantiation
+    assertEquals(operator1.getOperator().getClass(), TestGeneratorInputOperator.class);
+    TestGeneratorInputOperator GenericTestNode = (TestGeneratorInputOperator)operator1.getOperator();
+    assertEquals("myStringPropertyValue", GenericTestNode.getMyStringProperty());
+
+    // check links
+    assertEquals("operator1 inputs", 0, operator1.getInputStreams().size());
+    assertEquals("operator1 outputs", 1, operator1.getOutputStreams().size());
+    StreamMeta n1n2 = operator2.getInputStreams().get(operator2.getMeta(((GenericTestOperator)operator2.getOperator()).inport1));
+    assertNotNull("n1n2", n1n2);
+
+    // output/input stream object same
+    assertEquals("rootNode out is operator2 in", n1n2, operator1.getOutputStreams().get(operator1.getMeta(((TestGeneratorInputOperator)operator1.getOperator()).outport)));
+    assertEquals("n1n2 source", operator1, n1n2.getSource().getOperatorMeta());
+    Assert.assertEquals("n1n2 targets", 1, n1n2.getSinks().size());
+    Assert.assertEquals("n1n2 target", operator2, n1n2.getSinks().get(0).getOperatorWrapper());
+
+    assertEquals("stream name", "n1n2", n1n2.getName());
+    Assert.assertEquals("n1n2 not inline (default)", null, n1n2.getLocality());
+
+    // operator 2 streams to operator 3 and operator 4
+    assertEquals("operator 2 number of outputs", 1, operator2.getOutputStreams().size());
+    StreamMeta fromNode2 = operator2.getOutputStreams().values().iterator().next();
+
+    Set<OperatorMeta> targetNodes = Sets.newHashSet();
+    for (LogicalPlan.InputPortMeta ip : fromNode2.getSinks()) {
+      targetNodes.add(ip.getOperatorWrapper());
+    }
+    Assert.assertEquals("outputs " + fromNode2, Sets.newHashSet(operator3, operator4), targetNodes);
+
+    OperatorMeta operator6 = assertNode(dag, "operator6");
+
+    List<OperatorMeta> rootNodes = dag.getRootOperators();
+    assertEquals("number root operators", 2, rootNodes.size());
+    assertTrue("root operator2", rootNodes.contains(operator1));
+    assertTrue("root operator6", rootNodes.contains(operator6));
+
+    for (OperatorMeta n : rootNodes) {
+      printTopology(n, dag, 0);
+    }
+
+  }
+
+  private void printTopology(OperatorMeta operator, DAG tplg, int level) {
+      String prefix = "";
+      if (level > 0) {
+        prefix = StringUtils.repeat(" ", 20*(level-1)) + "   |" + StringUtils.repeat("-", 17);
+      }
+      logger.debug(prefix  + operator.getName());
+      for (StreamMeta downStream : operator.getOutputStreams().values()) {
+          if (!downStream.getSinks().isEmpty()) {
+            for (LogicalPlan.InputPortMeta targetNode : downStream.getSinks()) {
+              printTopology(targetNode.getOperatorWrapper(), tplg, level+1);
+            }
+          }
+      }
+  }
+
+  @Test
+  public void testLoadFromPropertiesFile() throws IOException
+  {
+      Properties props = new Properties();
+      String resourcePath = "/testTopology.properties";
+      InputStream is = this.getClass().getResourceAsStream(resourcePath);
+      if (is == null) {
+        fail("Could not load " + resourcePath);
+      }
+      props.load(is);
+      LogicalPlanConfiguration pb = new LogicalPlanConfiguration(new Configuration(false))
+            .addFromProperties(props, null);
+
+      LogicalPlan dag = new LogicalPlan();
+      pb.populateDAG(dag);
+      dag.validate();
+
+      assertEquals("number of operator confs", 5, dag.getAllOperators().size());
+      assertEquals("number of root operators", 1, dag.getRootOperators().size());
+
+      StreamMeta s1 = dag.getStream("n1n2");
+      assertNotNull(s1);
+      assertTrue("n1n2 inline", DAG.Locality.CONTAINER_LOCAL == s1.getLocality());
+
+      OperatorMeta operator3 = dag.getOperatorMeta("operator3");
+      assertEquals("operator3.classname", GenericTestOperator.class, operator3.getOperator().getClass());
+
+      GenericTestOperator doperator3 = (GenericTestOperator)operator3.getOperator();
+      assertEquals("myStringProperty " + doperator3, "myStringPropertyValueFromTemplate", doperator3.getMyStringProperty());
+      assertFalse("booleanProperty " + doperator3, doperator3.booleanProperty);
+
+      OperatorMeta operator4 = dag.getOperatorMeta("operator4");
+      GenericTestOperator doperator4 = (GenericTestOperator)operator4.getOperator();
+      assertEquals("myStringProperty " + doperator4, "overrideOperator4", doperator4.getMyStringProperty());
+      assertEquals("setterOnlyOperator4 " + doperator4, "setterOnlyOperator4", doperator4.propertySetterOnly);
+      assertTrue("booleanProperty " + doperator4, doperator4.booleanProperty);
+
+      StreamMeta input1 = dag.getStream("inputStream");
+      assertNotNull(input1);
+      Assert.assertEquals("input1 source", dag.getOperatorMeta("inputOperator"), input1.getSource().getOperatorMeta());
+      Set<OperatorMeta> targetNodes = Sets.newHashSet();
+      for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) {
+        targetNodes.add(targetPort.getOperatorWrapper());
+      }
+
+      Assert.assertEquals("input1 target ", Sets.newHashSet(dag.getOperatorMeta("operator1"), operator3, operator4), targetNodes);
+
+  }
+
+  @Test
+  public void testLoadFromJson() throws Exception
+  {
+    String resourcePath = "/testTopology.json";
+    InputStream is = this.getClass().getResourceAsStream(resourcePath);
+    if (is == null) {
+      fail("Could not load " + resourcePath);
+    }
+    StringWriter writer = new StringWriter();
+
+    IOUtils.copy(is, writer);
+    JSONObject json = new JSONObject(writer.toString());
+
+    Configuration conf = new Configuration(false);
+    conf.set(StreamingApplication.DT_PREFIX + "operator.operator3.prop.myStringProperty", "o3StringFromConf");
+
+    LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf);
+    LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson");
+    dag.validate();
+
+    assertEquals("DAG attribute CONTAINER_JVM_OPTIONS ", dag.getAttributes().get(DAGContext.CONTAINER_JVM_OPTIONS), "-Xmx16m");
+    Map<Class<?>, Class<? extends StringCodec<?>>> stringCodecsMap = Maps.newHashMap();
+    stringCodecsMap.put(Integer.class, Integer2String.class);
+    assertEquals("DAG attribute STRING_CODECS ", stringCodecsMap, dag.getAttributes().get(DAGContext.STRING_CODECS));
+    assertEquals("DAG attribute CONTAINER_OPTS_CONFIGURATOR ", BasicContainerOptConfigurator.class, dag.getAttributes().get(DAGContext.CONTAINER_OPTS_CONFIGURATOR).getClass());
+
+    assertEquals("number of operator confs", 5, dag.getAllOperators().size());
+    assertEquals("number of root operators", 1, dag.getRootOperators().size());
+
+    StreamMeta s1 = dag.getStream("n1n2");
+    assertNotNull(s1);
+    assertTrue("n1n2 inline", DAG.Locality.CONTAINER_LOCAL == s1.getLocality());
+
+    OperatorMeta input = dag.getOperatorMeta("inputOperator");
+    TestStatsListener tsl = new TestStatsListener();
+    tsl.setIntProp(222);
+    List<StatsListener> sll = Lists.<StatsListener>newArrayList(tsl);
+    assertEquals("inputOperator STATS_LISTENERS attribute ", sll, input.getAttributes().get(OperatorContext.STATS_LISTENERS));
+    for(OutputPortMeta opm : input.getOutputStreams().keySet()){
+      assertTrue("output port of input Operator attribute is JsonStreamCodec ", opm.getAttributes().get(PortContext.STREAM_CODEC) instanceof JsonStreamCodec<?>);
+    }
+
+    OperatorMeta operator3 = dag.getOperatorMeta("operator3");
+    assertEquals("operator3.classname", GenericTestOperator.class, operator3.getOperator().getClass());
+
+    GenericTestOperator doperator3 = (GenericTestOperator)operator3.getOperator();
+    assertEquals("myStringProperty " + doperator3, "o3StringFromConf", doperator3.getMyStringProperty());
+    assertFalse("booleanProperty " + doperator3, doperator3.booleanProperty);
+
+    OperatorMeta operator4 = dag.getOperatorMeta("operator4");
+    GenericTestOperator doperator4 = (GenericTestOperator)operator4.getOperator();
+    assertEquals("myStringProperty " + doperator4, "overrideOperator4", doperator4.getMyStringProperty());
+    assertEquals("setterOnlyOperator4 " + doperator4, "setterOnlyOperator4", doperator4.propertySetterOnly);
+    assertTrue("booleanProperty " + doperator4, doperator4.booleanProperty);
+
+    StreamMeta input1 = dag.getStream("inputStream");
+    assertNotNull(input1);
+    OperatorMeta inputOperator = dag.getOperatorMeta("inputOperator");
+    Assert.assertEquals("input1 source", inputOperator, input1.getSource().getOperatorMeta());
+    Set<OperatorMeta> targetNodes = Sets.newHashSet();
+    for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) {
+      targetNodes.add(targetPort.getOperatorWrapper());
+    }
+    Assert.assertEquals("operator attribute " + inputOperator, 64, (int)inputOperator.getValue(OperatorContext.MEMORY_MB));
+    Assert.assertEquals("port attribute " + inputOperator, 8, (int)input1.getSource().getValue(PortContext.UNIFIER_LIMIT));
+    Assert.assertEquals("input1 target ", Sets.newHashSet(dag.getOperatorMeta("operator1"), operator3, operator4), targetNodes);
+  }
+
+  @Test
+  @SuppressWarnings("UnnecessaryBoxing")
+  public void testAppLevelAttributes()
+  {
+    String appName = "app1";
+
+    Properties props = new Properties();
+    props.put(StreamingApplication.DT_PREFIX + DAG.MASTER_MEMORY_MB.getName(), "123");
+    props.put(StreamingApplication.DT_PREFIX + DAG.CONTAINER_JVM_OPTIONS.getName(), "-Dlog4j.properties=custom_log4j.properties");
+    props.put(StreamingApplication.DT_PREFIX + DAG.APPLICATION_PATH.getName(), "/defaultdir");
+    props.put(StreamingApplication.DT_PREFIX + "application." + appName + "." + DAG.APPLICATION_PATH.getName(), "/otherdir");
+    props.put(StreamingApplication.DT_PREFIX + "application." + appName + "." + DAG.STREAMING_WINDOW_SIZE_MILLIS.getName(), "1000");
+
+    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
+    dagBuilder.addFromProperties(props, null);
+
+    LogicalPlan dag = new LogicalPlan();
+
+    dagBuilder.populateDAG(dag);
+
+    dagBuilder.setApplicationConfiguration(dag, appName, null);
+
+    Assert.assertEquals("", "/otherdir", dag.getValue(DAG.APPLICATION_PATH));
+    Assert.assertEquals("", Integer.valueOf(123), dag.getValue(DAG.MASTER_MEMORY_MB));
+    Assert.assertEquals("", Integer.valueOf(1000), dag.getValue(DAG.STREAMING_WINDOW_SIZE_MILLIS));
+    Assert.assertEquals("", "-Dlog4j.properties=custom_log4j.properties", dag.getValue(DAG.CONTAINER_JVM_OPTIONS));
+
+  }
+  @Test
+  @SuppressWarnings("UnnecessaryBoxing")
+  public void testAppLevelProperties() {
+	  String appName ="app1";
+	  Properties props =new Properties();
+	  props.put(StreamingApplication.DT_PREFIX + "application."+appName+".testprop1","10");
+	  props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".prop.testprop2", "100");
+	  props.put(StreamingApplication.DT_PREFIX + "application.*.prop.testprop3","1000");
+	  props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".inncls.a", "10000");
+	  LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
+    dagBuilder.addFromProperties(props, null);
+
+	  LogicalPlan dag = new LogicalPlan();
+	  TestApplication app1Test=new TestApplication();
+
+	  dagBuilder.setApplicationConfiguration(dag, appName,app1Test);
+	  Assert.assertEquals("",Integer.valueOf(10),app1Test.getTestprop1());
+	  Assert.assertEquals("",Integer.valueOf(100),app1Test.getTestprop2());
+	  Assert.assertEquals("",Integer.valueOf(1000),app1Test.getTestprop3());
+	  Assert.assertEquals("",Integer.valueOf(10000),app1Test.getInncls().getA());
+  }
+
+  @Test
+  public void testPrepareDAG() {
+    final MutableBoolean appInitialized = new MutableBoolean(false);
+    StreamingApplication app = new StreamingApplication() {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+        Assert.assertEquals("", "hostname:9090", dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS));
+        dag.setAttribute(DAG.GATEWAY_CONNECT_ADDRESS, "hostname:9091");
+        appInitialized.setValue(true);
+      }
+    };
+    Configuration conf = new Configuration(false);
+    conf.addResource(StramClientUtils.DT_SITE_XML_FILE);
+    LogicalPlanConfiguration pb = new LogicalPlanConfiguration(conf);
+
+    LogicalPlan dag = new LogicalPlan();
+    pb.prepareDAG(dag, app, "testconfig");
+
+    Assert.assertTrue("populateDAG called", appInitialized.booleanValue());
+    Assert.assertEquals("populateDAG overrides attribute", "hostname:9091", dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS));
+  }
+
+  @Test
+  public void testOperatorConfigurationLookup() {
+
+    Properties props = new Properties();
+
+    // match operator by name
+    props.put(StreamingApplication.DT_PREFIX + "template.matchId1.matchIdRegExp", ".*operator1.*");
+    props.put(StreamingApplication.DT_PREFIX + "template.matchId1.stringProperty2", "stringProperty2Value-matchId1");
+    props.put(StreamingApplication.DT_PREFIX + "template.matchId1.nested.property", "nested.propertyValue-matchId1");
+
+    // match class name, lower priority
+    props.put(StreamingApplication.DT_PREFIX + "template.matchClass1.matchClassNameRegExp", ".*" + ValidationTestOperator.class.getSimpleName());
+    props.put(StreamingApplication.DT_PREFIX + "template.matchClass1.stringProperty2", "stringProperty2Value-matchClass1");
+
+    // match class name
+    props.put(StreamingApplication.DT_PREFIX + "template.t2.matchClassNameRegExp", ".*"+GenericTestOperator.class.getSimpleName());
+    props.put(StreamingApplication.DT_PREFIX + "template.t2.myStringProperty", "myStringPropertyValue");
+
+    // direct setting
+    props.put(StreamingApplication.DT_PREFIX + "operator.operator3.emitFormat", "emitFormatValue");
+
+    LogicalPlan dag = new LogicalPlan();
+    Operator operator1 = dag.addOperator("operator1", new ValidationTestOperator());
+    Operator operator2 = dag.addOperator("operator2", new ValidationTestOperator());
+    Operator operator3 = dag.addOperator("operator3", new GenericTestOperator());
+
+    LogicalPlanConfiguration pb = new LogicalPlanConfiguration(new Configuration(false));
+    LOG.debug("calling addFromProperties");
+    pb.addFromProperties(props, null);
+
+    Map<String, String> configProps = pb.getProperties(dag.getMeta(operator1), "appName");
+    Assert.assertEquals("" + configProps, 2, configProps.size());
+    Assert.assertEquals("" + configProps, "stringProperty2Value-matchId1", configProps.get("stringProperty2"));
+    Assert.assertEquals("" + configProps, "nested.propertyValue-matchId1", configProps.get("nested.property"));
+
+    configProps = pb.getProperties(dag.getMeta(operator2), "appName");
+    Assert.assertEquals("" + configProps, 1, configProps.size());
+    Assert.assertEquals("" + configProps, "stringProperty2Value-matchClass1", configProps.get("stringProperty2"));
+
+    configProps = pb.getProperties(dag.getMeta(operator3), "appName");
+    Assert.assertEquals("" + configProps, 2, configProps.size());
+    Assert.assertEquals("" + configProps, "myStringPropertyValue", configProps.get("myStringProperty"));
+    Assert.assertEquals("" + configProps, "emitFormatValue", configProps.get("emitFormat"));
+
+  }
+
+  @Test
+  public void testSetOperatorProperties() {
+
+    Configuration conf = new Configuration(false);
+    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.myStringProperty", "myStringPropertyValue");
+    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.stringArrayField", "a,b,c");
+    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty.key1", "key1Val");
+    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty(key1.dot)", "key1dotVal");
+    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty(key2.dot)", "key2dotVal");
+
+    LogicalPlan dag = new LogicalPlan();
+    GenericTestOperator o1 = dag.addOperator("o1", new GenericTestOperator());
+    ValidationTestOperator o2 = dag.addOperator("o2", new ValidationTestOperator());
+
+    LogicalPlanConfiguration pb = new LogicalPlanConfiguration(conf);
+
+    pb.setOperatorProperties(dag, "testSetOperatorProperties");
+    Assert.assertEquals("o1.myStringProperty", "myStringPropertyValue", o1.getMyStringProperty());
+    Assert.assertArrayEquals("o2.stringArrayField", new String[] {"a", "b", "c"}, o2.getStringArrayField());
+
+    Assert.assertEquals("o2.mapProperty.key1", "key1Val", o2.getMapProperty().get("key1"));
+    Assert.assertEquals("o2.mapProperty(key1.dot)", "key1dotVal", o2.getMapProperty().get("key1.dot"));
+    Assert.assertEquals("o2.mapProperty(key2.dot)", "key2dotVal", o2.getMapProperty().get("key2.dot"));
+
+  }
+
+  @ApplicationAnnotation(name="AnnotatedAlias")
+  class AnnotatedApplication implements StreamingApplication {
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+    }
+
+  }
+
+  @Test
+  public void testAppNameAttribute() {
+    StreamingApplication app = new AnnotatedApplication();
+    Configuration conf = new Configuration(false);
+    conf.addResource(StramClientUtils.DT_SITE_XML_FILE);
+
+    LogicalPlanConfiguration builder = new LogicalPlanConfiguration(conf);
+
+    Properties properties = new Properties();
+    properties.put(StreamingApplication.DT_PREFIX + "application.TestAliasApp.class", app.getClass().getName());
+
+    builder.addFromProperties(properties, null);
+
+    LogicalPlan dag = new LogicalPlan();
+    String appPath = app.getClass().getName().replace(".", "/") + ".class";
+    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_NAME, "testApp");
+    builder.prepareDAG(dag, app, appPath);
+
+    Assert.assertEquals("Application name", "testApp", dag.getAttributes().get(com.datatorrent.api.Context.DAGContext.APPLICATION_NAME));
+  }
+
+  @Test
+  public void testAppAlias() {
+    StreamingApplication app = new AnnotatedApplication();
+    Configuration conf = new Configuration(false);
+    conf.addResource(StramClientUtils.DT_SITE_XML_FILE);
+
+    LogicalPlanConfiguration builder = new LogicalPlanConfiguration(conf);
+
+    Properties properties = new Properties();
+    properties.put(StreamingApplication.DT_PREFIX + "application.TestAliasApp.class", app.getClass().getName());
+
+    builder.addFromProperties(properties, null);
+
+    LogicalPlan dag = new LogicalPlan();
+    String appPath = app.getClass().getName().replace(".", "/") + ".class";
+    builder.prepareDAG(dag, app, appPath);
+
+    Assert.assertEquals("Application name", "TestAliasApp", dag.getAttributes().get(com.datatorrent.api.Context.DAGContext.APPLICATION_NAME));
+  }
+
+
+  @Test
+  public void testAppAnnotationAlias() {
+    StreamingApplication app = new AnnotatedApplication();
+    Configuration conf = new Configuration(false);
+    conf.addResource(StramClientUtils.DT_SITE_XML_FILE);
+
+    LogicalPlanConfiguration builder = new LogicalPlanConfiguration(conf);
+
+    LogicalPlan dag = new LogicalPlan();
+    String appPath = app.getClass().getName().replace(".", "/") + ".class";
+    builder.prepareDAG(dag, app, appPath);
+
+    Assert.assertEquals("Application name", "AnnotatedAlias", dag.getAttributes().get(com.datatorrent.api.Context.DAGContext.APPLICATION_NAME));
+  }
+
+  @Test
+  @SuppressWarnings( {"UnnecessaryBoxing", "AssertEqualsBetweenInconvertibleTypes"})
+  public void testOperatorLevelAttributes() {
+    String appName = "app1";
+    StreamingApplication app = new StreamingApplication() {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+        dag.addOperator("operator1", GenericTestOperator.class);
+        dag.addOperator("operator2", GenericTestOperator.class);
+      }
+    };
+
+    Properties props = new Properties();
+    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName());
+    props.put(StreamingApplication.DT_PREFIX + "operator.*." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "2");
+    props.put(StreamingApplication.DT_PREFIX + "operator.*." + OperatorContext.STATS_LISTENERS.getName(), PartitionLoadWatch.class.getName());
+    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "20");
+
+    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
+    dagBuilder.addFromProperties(props, null);
+
+    String appPath = app.getClass().getName().replace(".", "/") + ".class";
+
+    LogicalPlan dag = new LogicalPlan();
+    dagBuilder.prepareDAG(dag, app, appPath);
+
+    Assert.assertEquals("", Integer.valueOf(20), dag.getOperatorMeta("operator1").getValue(OperatorContext.APPLICATION_WINDOW_COUNT));
+    Assert.assertEquals("", Integer.valueOf(2), dag.getOperatorMeta("operator2").getValue(OperatorContext.APPLICATION_WINDOW_COUNT));
+    Assert.assertEquals("", PartitionLoadWatch.class, dag.getOperatorMeta("operator2").getValue(OperatorContext.STATS_LISTENERS).toArray()[0].getClass());
+  }
+
+  @Test
+  public void testOperatorLevelProperties() {
+    String appName = "app1";
+    final GenericTestOperator operator1 = new GenericTestOperator();
+    final GenericTestOperator operator2 = new GenericTestOperator();
+    StreamingApplication app = new StreamingApplication() {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+        dag.addOperator("operator1", operator1);
+        dag.addOperator("operator2", operator2);
+      }
+    };
+
+    Properties props = new Properties();
+    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName());
+    props.put(StreamingApplication.DT_PREFIX + "operator.*.myStringProperty", "pv1");
+    props.put(StreamingApplication.DT_PREFIX + "operator.*.booleanProperty", Boolean.TRUE.toString());
+    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.myStringProperty", "apv1");
+
+    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
+    dagBuilder.addFromProperties(props, null);
+
+    String appPath = app.getClass().getName().replace(".", "/") + ".class";
+
+    LogicalPlan dag = new LogicalPlan();
+    dagBuilder.prepareDAG(dag, app, appPath);
+
+    Assert.assertEquals("apv1", operator1.getMyStringProperty());
+    Assert.assertEquals("pv1", operator2.getMyStringProperty());
+    Assert.assertEquals(true, operator2.isBooleanProperty());
+  }
+
+  @Test
+  public void testApplicationLevelParameter()
+  {
+    String appName = "app1";
+    final GenericTestOperator operator1 = new GenericTestOperator();
+    final GenericTestOperator operator2 = new GenericTestOperator();
+    StreamingApplication app = new StreamingApplication()
+    {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+        dag.addOperator("operator1", operator1);
+        dag.addOperator("operator2", operator2);
+      }
+    };
+
+    Properties props = new Properties();
+    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName());
+    props.put(StreamingApplication.DT_PREFIX + "operator.*.myStringProperty", "foo ${xyz} bar ${zzz} baz");
+    props.put(StreamingApplication.DT_PREFIX + "operator.*.booleanProperty", Boolean.TRUE.toString());
+    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.myStringProperty", "apv1");
+
+    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
+
+    Configuration vars = new Configuration(false);
+    vars.set("xyz", "123");
+    vars.set("zzz", "456");
+    dagBuilder.addFromProperties(props, vars);
+
+    String appPath = app.getClass().getName().replace(".", "/") + ".class";
+
+    LogicalPlan dag = new LogicalPlan();
+    dagBuilder.prepareDAG(dag, app, appPath);
+
+    Assert.assertEquals("apv1", operator1.getMyStringProperty());
+    Assert.assertEquals("foo 123 bar 456 baz", operator2.getMyStringProperty());
+    Assert.assertEquals(true, operator2.isBooleanProperty());
+  }
+
+  @Test
+  @SuppressWarnings("UnnecessaryBoxing")
+  public void testPortLevelAttributes() {
+    String appName = "app1";
+    SimpleTestApplication app = new SimpleTestApplication();
+
+    Properties props = new Properties();
+    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName());
+    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.port.*." + PortContext.QUEUE_CAPACITY.getName(), "" + 16 * 1024);
+    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator2.inputport.inport1." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024);
+    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator2.outputport.outport1." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024);
+    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator3.port.*." + PortContext.QUEUE_CAPACITY.getName(), "" + 16 * 1024);
+    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator3.inputport.inport2." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024);
+
+    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
+    dagBuilder.addFromProperties(props, null);
+
+    String appPath = app.getClass().getName().replace(".", "/") + ".class";
+
+    LogicalPlan dag = new LogicalPlan();
+    dagBuilder.prepareDAG(dag, app, appPath);
+
+    OperatorMeta om1 = dag.getOperatorMeta("operator1");
+    Assert.assertEquals("", Integer.valueOf(16 * 1024), om1.getMeta(app.gt1.outport1).getValue(PortContext.QUEUE_CAPACITY));
+    OperatorMeta om2 = dag.getOperatorMeta("operator2");
+    Assert.assertEquals("", Integer.valueOf(32 * 1024), om2.getMeta(app.gt2.inport1).getValue(PortContext.QUEUE_CAPACITY));
+    Assert.assertEquals("", Integer.valueOf(32 * 1024), om2.getMeta(app.gt2.outport1).getValue(PortContext.QUEUE_CAPACITY));
+    OperatorMeta om3 = dag.getOperatorMeta("operator3");
+    Assert.assertEquals("", Integer.valueOf(16 * 1024), om3.getMeta(app.gt3.inport1).getValue(PortContext.QUEUE_CAPACITY));
+    Assert.assertEquals("", Integer.valueOf(32 * 1024), om3.getMeta(app.gt3.inport2).getValue(PortContext.QUEUE_CAPACITY));
+  }
+
+
+  @Test
+  public void testInvalidAttribute() throws Exception {
+    Assert.assertNotSame(0, com.datatorrent.api.Context.DAGContext.serialVersionUID);
+    Attribute<String> attribute = new Attribute<>("", null);
+
+    Field nameField = Attribute.class.getDeclaredField("name");
+    nameField.setAccessible(true);
+    nameField.set(attribute, "NOT_CONFIGURABLE");
+    nameField.setAccessible(false);
+
+    ContextUtils.addAttribute(com.datatorrent.api.Context.DAGContext.class, attribute);
+    AttributeParseUtils.initialize();
+    ConfElement.initialize();
+
+    // attribute that cannot be configured
+
+    Properties props = new Properties();
+    props.put(StreamingApplication.DT_PREFIX + "attr.NOT_CONFIGURABLE", "value");
+
+    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
+    dagBuilder.addFromProperties(props, null);
+
+    try {
+      dagBuilder.prepareDAG(new LogicalPlan(), null, "");
+      Assert.fail("Exception expected");
+    } catch (Exception e) {
+      Assert.assertThat("Attribute not configurable", e.getMessage(), RegexMatcher.matches("Attribute does not support property configuration: NOT_CONFIGURABLE.*"));
+    }
+
+    ContextUtils.removeAttribute(com.datatorrent.api.Context.DAGContext.class, attribute);
+    AttributeParseUtils.initialize();
+    ConfElement.initialize();
+
+    // invalid attribute name
+    props = new Properties();
+    String invalidAttribute = StreamingApplication.DT_PREFIX + "attr.INVALID_NAME";
+    props.put(invalidAttribute, "value");
+
+    try {
+      new LogicalPlanConfiguration(new Configuration(false)).addFromProperties(props, null);
+      Assert.fail("Exception expected");
+    } catch (Exception e) {
+      LOG.debug("Exception message: {}", e);
+      Assert.assertThat("Invalid attribute name", e.getMessage(), RegexMatcher.matches("Invalid attribute reference: " + invalidAttribute));
+    }
+  }
+
+  @Test
+  public void testAttributesCodec() {
+    Assert.assertNotSame(null, new Long[] {com.datatorrent.api.Context.DAGContext.serialVersionUID, OperatorContext.serialVersionUID, PortContext.serialVersionUID});
+    @SuppressWarnings("unchecked")
+    Set<Class<? extends Context>> contextClasses = Sets.newHashSet(com.datatorrent.api.Context.DAGContext.class, OperatorContext.class, PortContext.class);
+    for (Class<?> c : contextClasses) {
+      for (Attribute<Object> attr : AttributeInitializer.getAttributes(c)) {
+        Assert.assertNotNull(attr.name + " codec", attr.codec);
+      }
+    }
+  }
+
+  @Test
+  public void testTupleClassAttr() throws Exception
+  {
+    String resourcePath = "/schemaTestTopology.json";
+    InputStream is = this.getClass().getResourceAsStream(resourcePath);
+    if (is == null) {
+      fail("Could not load " + resourcePath);
+    }
+    StringWriter writer = new StringWriter();
+
+    IOUtils.copy(is, writer);
+    JSONObject json = new JSONObject(writer.toString());
+
+    Configuration conf = new Configuration(false);
+
+    LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf);
+    LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson");
+    dag.validate();
+
+    OperatorMeta operator1 = dag.getOperatorMeta("operator1");
+    assertEquals("operator1.classname", SchemaTestOperator.class, operator1.getOperator().getClass());
+
+    StreamMeta input1 = dag.getStream("inputStream");
+    assertNotNull(input1);
+    for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) {
+      Assert.assertEquals("tuple class name required", TestSchema.class, targetPort.getAttributes().get(PortContext.TUPLE_CLASS));
+    }
+  }
+
+  @Test(expected = ValidationException.class)
+  public void testTupleClassAttrValidation() throws Exception
+  {
+    String resourcePath = "/schemaTestTopology.json";
+    InputStream is = this.getClass().getResourceAsStream(resourcePath);
+    if (is == null) {
+      fail("Could not load " + resourcePath);
+    }
+    StringWriter writer = new StringWriter();
+
+    IOUtils.copy(is, writer);
+    JSONObject json = new JSONObject(writer.toString());
+
+    //removing schema so that validation fails
+    json.getJSONArray("streams").getJSONObject(0).remove("schema");
+    Configuration conf = new Configuration(false);
+
+    LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf);
+    LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson");
+
+    dag.validate();
+  }
+
+  @Test
+  public void testTestTupleClassAttrSetFromConfig()
+  {
+    Configuration conf = new Configuration(false);
+    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.port.schemaRequiredPort.attr.TUPLE_CLASS",
+      "com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest$TestSchema");
+
+    StreamingApplication streamingApplication = new StreamingApplication()
+    {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+        TestGeneratorInputOperator o1 = dag.addOperator("o1", new TestGeneratorInputOperator());
+        SchemaTestOperator o2 = dag.addOperator("o2", new SchemaTestOperator());
+        dag.addStream("stream", o1.outport, o2.schemaRequiredPort);
+      }
+    };
+    LogicalPlan dag = new LogicalPlan();
+    LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
+    lpc.prepareDAG(dag, streamingApplication, "app");
+    dag.validate();
+  }
+
+  /*
+   * This test and all of the following ambiguous attribute tests verify that when an ambiguous attribute
+   * name is provided, all the corresponding attributes are set.
+   * <br/><br/>
+   * <b>Note:</b> Ambiguous attribute means that when multiple attributes with the same
+   * simple name exist for multiple types of dag elements (like operators and ports).
+   * An example of such attributes are the com.datatorrent.api.Context.OperatorContext.AUTO_RECORD
+   * and com.datatorrent.api.Context.PortContext.AUTO_RECORD.
+   * <br/><br/>
+   * This test should set the attribute on the operators and ports.
+   */
+  /**
+   * This test checks if an ambiguous DAG attribute does not get set on operators.
+   */
+  @Test
+  public void testDagAttributeNotSetOnOperator()
+  {
+    dagOperatorAttributeHelper(true);
+  }
+
+  @Test
+  public void testAmbiguousAttributeSetOnOperatorAndNotDAG()
+  {
+    dagOperatorAttributeHelper(false);
+  }
+
+  private void dagOperatorAttributeHelper(boolean attrOnDag)
+  {
+    String attributeName = null;
+
+    if (attrOnDag) {
+      attributeName = DAGContext.CHECKPOINT_WINDOW_COUNT.getSimpleName();
+    }
+    else {
+      attributeName = OperatorContext.class.getCanonicalName() + LogicalPlanConfiguration.KEY_SEPARATOR + DAGContext.CHECKPOINT_WINDOW_COUNT.getSimpleName();
+    }
+
+    Properties props = new Properties();
+    String propName = StreamingApplication.DT_PREFIX + StramElement.ATTR.getValue() + LogicalPlanConfiguration.KEY_SEPARATOR + attributeName;
+    props.put(propName, "5");
+
+    SimpleTestApplicationWithName app = new SimpleTestApplicationWithName();
+
+    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
+    dagBuilder.addFromProperties(props, null);
+
+    String appPath = app.getClass().getName().replace(".", "/") + ".class";
+
+    LogicalPlan dag = new LogicalPlan();
+    dagBuilder.prepareDAG(dag, app, appPath);
+
+    OperatorMeta om1 = dag.getOperatorMeta("operator1");
+
+    if (attrOnDag) {
+      Assert.assertNotEquals((Integer)5, om1.getValue(OperatorContext.CHECKPOINT_WINDOW_COUNT));
+    } else {
+      Assert.assertEquals((Integer)5, om1.getValue(OperatorContext.CHECKPOINT_WINDOW_COUNT));
+    }
+  }
+
+  /**
+   * This test should set the attribute on the operators and ports.
+   */
+  @Test
+  public void testRootLevelAmbiguousAttributeSimple()
+  {
+    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD,
+                                       StreamingApplication.DT_PREFIX, null, Boolean.TRUE, true, true);
+  }
+
+  /**
+   * This test should set the attribute on the operators and ports.
+   */
+  @Test
+  public void testApplicationLevelAmbiguousAttributeSimple()
+  {
+    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD,
+                                       StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR +
+                                       "*" + LogicalPlanConfiguration.KEY_SEPARATOR, null, Boolean.TRUE, true, true);
+  }
+
+  /**
+   * This should only set the attribute on the operator
+   */
+  @Test
+  public void testOperatorLevelAmbiguousAttributeSimple()
+  {
+    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD,
+                                       StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR +
+                                       "*" + LogicalPlanConfiguration.KEY_SEPARATOR, null, Boolean.TRUE, true, false);
+  }
+
+  /**
+   * This should only set the attribute on the port
+   */
+  @Test
+  public void testPortLevelAmbiguousAttributeSimple()
+  {
+    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD,
+                                       StreamingApplication.DT_PREFIX + "port" + LogicalPlanConfiguration.KEY_SEPARATOR +
+                                       "*" + LogicalPlanConfiguration.KEY_SEPARATOR, null, Boolean.TRUE, false, true);
+  }
+
+  /**
+   * This test should set the attribute on the operators and ports.
+   */
+  @Test
+  public void testRootLevelAmbiguousAttributeComplex()
+  {
+    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, StreamingApplication.DT_PREFIX,
+                                       PortContext.class.getCanonicalName(), Boolean.TRUE, false, true);
+  }
+
+  /**
+   * This test should set the attribute on the operators and ports.
+   */
+  @Test
+  public void testApplicationLevelAmbiguousAttributeComplex()
+  {
+    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD,
+                                       StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR +
+                                       "*" + LogicalPlanConfiguration.KEY_SEPARATOR, PortContext.class.getCanonicalName(),
+                                       Boolean.TRUE, false, true);
+  }
+
+  /**
+   * This should only set the attribute on the operator
+   */
+  @Test
+  public void testOperatorLevelAmbiguousAttributeComplex()
+  {
+    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD,
+                                       StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR +
+                                       "*" + LogicalPlanConfiguration.KEY_SEPARATOR, OperatorContext.class.getCanonicalName(),
+                                       Boolean.TRUE, true, false);
+  }
+
+  /**
+   * This should only set the attribute on the port
+   */
+  @Test
+  public void testOperatorLevelAmbiguousAttributeComplex2()
+  {
+    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD,
+                                       StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR +
+                                       "*" + LogicalPlanConfiguration.KEY_SEPARATOR, PortContext.class.getCanonicalName(),
+                                       Boolean.TRUE, false, true);
+  }
+
+  /**
+   * This should only set the attribute on the port
+   */
+  @Test
+  public void testPortLevelAmbiguousAttributeComplex()
+  {
+    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD,
+                                       StreamingApplication.DT_PREFIX + "port" + LogicalPlanConfiguration.KEY_SEPARATOR +
+                                       "*" + LogicalPlanConfiguration.KEY_SEPARATOR, PortContext.class.getCanonicalName(),
+                                       Boolean.TRUE, false, true);
+  }
+
+  private void testAttributeAmbiguousSimpleHelper(Attribute<?> attributeObjOperator, Attribute<?> attributeObjPort,
+                                                  String root, String contextClass, Object val, boolean operatorSet,
+                                                  boolean portSet)
+  {
+    Properties props = propertiesBuilder(attributeObjOperator.getSimpleName(), root, contextClass, val);
+
+    simpleAttributeOperatorHelperAssert(attributeObjOperator, props, val, operatorSet);
+
+    simpleNamePortAssertHelperAssert(attributeObjPort, props, val, portSet);
+  }
+
+  @Test
+  public void testRootLevelAttributeSimpleNameOperator()
+  {
+    simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, StreamingApplication.DT_PREFIX, true, 4096, true, true);
+  }
+
+  @Test
+  public void testRootLevelStorageAgentSimpleNameOperator()
+  {
+    MockStorageAgent mockAgent = new MockStorageAgent();
+
+    simpleAttributeOperatorHelper(OperatorContext.STORAGE_AGENT, StreamingApplication.DT_PREFIX, true, mockAgent, true, false);
+  }
+
+  @Test
+  public void testRootLevelAttributeSimpleNameOperatorNoScope()
+  {
+    simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, StreamingApplication.DT_PREFIX, true, 4096, true, false);
+  }
+
+  @Test
+  public void testApplicationLevelAttributeSimpleNameOperator()
+  {
+    simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, StreamingApplication.DT_PREFIX + "application" +
+                                  LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR,
+                                  true, 4096, true, true);
+  }
+
+  private void simpleAttributeOperatorHelper(Attribute<?> attributeObj, String root, boolean simpleName,
+                                             Object val, boolean set, boolean scope)
+  {
+    Properties props = propertiesBuilderOperator(attributeObj.getSimpleName(), root, simpleName,
+                                                 val, scope);
+
+    simpleAttributeOperatorHelperAssert(attributeObj, props, val, set);
+  }
+
+  private void simpleAttributeOperatorHelperAssert(Attribute<?> attributeObj, Properties props, Object val, boolean set)
+  {
+    SimpleTestApplicationWithName app = new SimpleTestApplicationWithName();
+
+    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
+    dagBuilder.addFromProperties(props, null);
+
+    String appPath = app.getClass().getName().replace(".", "/") + ".class";
+
+    LogicalPlan dag = new LogicalPlan();
+    dagBuilder.prepareDAG(dag, app, appPath);
+
+    OperatorMeta om1 = dag.getOperatorMeta("operator1");
+
+    if (set) {
+      Assert.assertEquals(val, om1.getValue(attributeObj));
+    } else {
+      Assert.assertNotEquals(val, om1.getValue(attributeObj));
+    }
+
+    OperatorMeta om2 = dag.getOperatorMeta("operator2");
+
+    if (set) {
+      Assert.assertEquals(val, om2.getValue(attributeObj));
+    } else {
+      Assert.assertNotEquals(val, om2.getValue(attributeObj));
+    }
+
+    OperatorMeta om3 = dag.getOperatorMeta("operator3");
+
+    if (set) {
+      Assert.assertEquals(val, om3.getValue(attributeObj));
+    } else {
+      Assert.assertNotEquals(val, om3.getValue(attributeObj));
+    }
+  }
+
+  /* Port tests */
+  @Test
+  public void testRootLevelAttributeSimpleNamePort()
+  {
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX,
+                              true, (Integer)4096, true, true);
+  }
+
+  @Test
+  public void testRootLevelAttributeSimpleNamePortNoScope()
+  {
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX,
+                              true, (Integer)4096, true, false);
+  }
+
+  @Test
+  public void testOperatorLevelAttributeSimpleNamePort()
+  {
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" +
+                              LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR,
+                              true, (Integer)4096, true, true);
+  }
+
+  @Test
+  public void testApplicationLevelAttributeSimpleNamePort()
+  {
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" +
+                              LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR,
+                              true, (Integer)4096, true, true);
+  }
+
+  @Test
+  public void testRootLevelAttributeComplexNamePort()
+  {
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, false,
+                              (Integer)4096, true, true);
+  }
+
+  @Test
+  public void testRootLevelAttributeComplexNamePortNoScope()
+  {
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, false,
+                              (Integer)4096, true, false);
+  }
+
+  @Test
+  public void testOperatorLevelAttributeComplexNamePort()
+  {
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" +
+                              LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR,
+                              false, (Integer)4096, true, true);
+  }
+
+  @Test
+  public void testApplicationLevelAttributeComplexNamePort()
+  {
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" +
+                              LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR,
+                              false, (Integer)4096, true, true);
+  }
+
+  /* Input port tests */
+  @Test
+  public void testRootLevelAttributeSimpleNameInputPort()
+  {
+    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, true,
+                                   (Integer)4096, true);
+  }
+
+  @Test
+  public void testOperatorLevelAttributeSimpleNameInputPort()
+  {
+    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR +
+                                   "*" + LogicalPlanConfiguration.KEY_SEPARATOR, true, (Integer)4096, true);
+  }
+
+  @Test
+  public void testApplicationLevelAttributeSimpleNameInputPort()
+  {
+    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" +
+                                   LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR,
+                                   true, (Integer)4096, true);
+  }
+
+  @Test
+  public void testRootLevelAttributeComplexNameInputPort()
+  {
+    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, false, (Integer)4096, true);
+  }
+
+  @Test
+  public void testOperatorLevelAttributeComplexNameInputPort()
+  {
+    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" +
+                                   LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, false,
+                                   (Integer)4096, true);
+  }
+
+  @Test
+  public void testApplicationLevelAttributeComplexNameInputPort()
+  {
+    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" +
+                                   LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR,
+                                   false, (Integer)4096, true);
+  }
+
+  /* Output port tests */
+  @Test
+  public void testRootLevelAttributeSimpleNameOutputPort()
+  {
+    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, true, (Integer)4096, true);
+  }
+
+  @Test
+  public void testOperatorLevelAttributeSimpleNameOutputPort()
+  {
+    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR +
+                                    "*" + LogicalPlanConfiguration.KEY_SEPARATOR, true, (Integer)4096, true);
+  }
+
+  @Test
+  public void testApplicationLevelAttributeSimpleNameOutputPort()
+  {
+    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR +
+                                    "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, true, (Integer)4096, true);
+  }
+
+  @Test
+  public void testRootLevelAttributeComplexNameOutputPort()
+  {
+    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, false, (Integer)4096, true);
+  }
+
+  @Test
+  public void testOperatorLevelAttributeComplexNameOutputPort()
+  {
+    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR +
+                                    "*" + LogicalPlanConfiguration.KEY_SEPARATOR, false, (Integer)4096, true);
+  }
+
+  @Test
+  public void testApplicationLevelAttributeComplexNameOutputPort()
+  {
+    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR +
+                                    "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, false, (Integer)4096, true);
+  }
+
+  /* Helpers for building ports */
+  private void simpleAttributePortHelper(Attribute<?> attributeObj, String root, boolean simpleName, Object val, boolean set, boolean scope)
+  {
+    Properties props = propertiesBuilderPort(attributeObj.getSimpleName(), root, simpleName, val, scope);
+
+    simpleNamePortAssertHelperAssert(attributeObj, props, val, set);
+  }
+
+  private void simpleAttributeInputPortHelper(Attribute<?> attributeObj, String root, boolean simpleName, Object val, boolean set)
+  {
+    Properties props = propertiesBuilderInputPort(attributeObj.getSimpleName(), root, simpleName, val);
+
+    simpleNameInputPortAssertHelperAssert(attributeObj, props, val, set);
+    simpleNameOutputPortAssertHelperAssert(attributeObj, props, val, !set);
+  }
+
+  private void simpleAttributeOutputPortHelper(Attribute<?> attributeObj, String root, boolean simpleName, Object val, boolean set)
+  {
+    Properties props = propertiesBuilderOutputPort(attributeObj.getSimpleName(), root, simpleName, val);
+
+    simpleNameOutputPortAssertHelperAssert(attributeObj, props, val, set);
+    simpleNameInputPortAssertHelperAssert(attributeObj, props, val, !set);
+  }
+
+  private void simpleNamePortAssertHelperAssert(Attribute<?> attributeObj, Properties props, Object val, boolean set)
+  {
+    SimpleTestApplicationWithName app = new SimpleTestApplicationWithName();
+
+    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
+    dagBuilder.addFromProperties(props, null);
+
+    String appPath = app.getClass().getName().replace(".", "/") + ".class";
+
+    LogicalPlan dag = new LogicalPlan();
+    dagBuilder.prepareDAG(dag, app, appPath);
+
+    simpleNamePortAssertHelper(attributeObj, dag, "operator1", val, set);
+    simpleNamePortAssertHelper(attributeObj, dag, "operator2", val, set);
+    simpleNamePortAssertHelper(attributeObj, dag, "operator3", val, set);
+  }
+
+  private void simpleNameInputPortAssertHelperAssert(Attribute<?> attributeObj, Properties props, Object val, boolean set)
+  {
+    SimpleTestApplicationWithName app = new SimpleTestApplicationWithName();
+
+    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
+    dagBuilder.addFromProperties(props, null);
+
+    String appPath = app.getClass().getName().replace(".", "/") + ".class";
+
+    LogicalPlan dag = new LogicalPlan();
+    dagBuilder.prepareDAG(dag, app, appPath);
+
+    simpleNameInputPortAssertHelper(attributeObj, dag, "operator1", val, set);
+    simpleNameInputPortAssertHelper(attributeObj, dag, "operator2", val, set);
+    simpleNameInputPortAssertHelper(attributeObj, dag, "operator3", val, set);
+  }
+
+  private void simpleNameOutputPortAssertHelperAssert(Attribute<?> attributeObj, Properties props, Object val, boolean set)
+  {
+    SimpleTestApplicationWithName app = new SimpleTestApplicationWithName();
+
+    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
+    dagBuilder.addFromProperties(props, null);
+
+    String appPath = app.getClass().getName().replace(".", "/") + ".class";
+
+    LogicalPlan dag = new LogicalPlan();
+    dagBuilder.prepareDAG(dag, app, appPath);
+
+    simpleNameOutputPortAssertHelper(attributeObj, dag, "operator1", val, set);
+    simpleNameOutputPortAssertHelper(attributeObj, dag, "operator2", val, set);
+    simpleNameOutputPortAssertHelper(attributeObj, dag, "operator3", val, set);
+  }
+
+  private void simpleNamePortAssertHelper(Attribute<?> attributeObj, LogicalPlan dag, String operatorName, Object queueCapacity, boolean set)
+  {
+    simpleNameInputPortAssertHelper(attributeObj, dag, operatorName, queueCapacity, set);
+    simpleNameOutputPortAssertHelper(attributeObj, dag, operatorName, queueCapacity, set);
+  }
+
+  private void simpleNameInputPortAssertHelper(Attribute<?> attributeObj, LogicalPlan dag, String operatorName, Object queueCapacity, boolean set)
+  {
+    OperatorMeta operatorMeta = dag.getOperatorMeta(operatorName);
+
+    for (InputPortMeta inputPortMeta: operatorMeta.getInputStreams().keySet()) {
+      if (set) {
+        Assert.assertEquals(queueCapacity, inputPortMeta.getValue(attributeObj));
+      } else {
+        Assert.assertNotEquals(queueCapacity, inputPortMeta.getValue(attributeObj));
+      }
+    }
+  }
+
+  private void simpleNameOutputPortAssertHelper(Attribute<?> attributeObj, LogicalPlan dag, String operatorName, Object queueCapacity, boolean set)
+  {
+    OperatorMeta operatorMeta = dag.getOperatorMeta(operatorName);
+
+    for (OutputPortMeta outputPortMeta: operatorMeta.getOutputStreams().keySet()) {
+      if (set) {
+        Assert.assertEquals(queueCapacity, outputPortMeta.getValue(attributeObj));
+      } else {
+        Assert.assertNotEquals(queueCapacity, outputPortMeta.getValue(attributeObj));
+      }
+    }
+  }
+
+  /* Helpers for building properties */
+  private Properties propertiesBuilder(String attributeName, String root, String contextClass, Object val)
+  {
+    boolean simpleName = contextClass == null;
+
+    if (!simpleName) {
+      attributeName = contextClass + LogicalPlanConfiguration.KEY_SEPARATOR + attributeName;
+    }
+
+    Properties props = new Properties();
+    String propName = root + StramElement.ATTR.getValue() + LogicalPlanConfiguration.KEY_SEPARATOR + attributeName;
+    props.put(propName, val.toString());
+
+    return props;
+  }
+
+  private Properties propertiesBuilderOperator(String attributeName, String root, boolean simpleName, Object val, boolean addOperator)
+  {
+    String contextClass = simpleName ? null : OperatorContext.class.getCanonicalName();
+
+    if (addOperator) {
+      root += "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR;
+    }
+
+    return propertiesBuilder(attributeName, root, contextClass, val);
+  }
+
+  private Properties propertiesBuilderPort(String attributeName, String root, boolean simpleName, Object val, boolean addPort)
+  {
+    String contextClass = simpleName ? null : PortContext.class.getCanonicalName();
+
+    if (addPort) {
+      root += "port" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR;
+    }
+
+    return propertiesBuilder(attributeName, root, contextClass, val);
+  }
+
+  private Properties propertiesBuilderInputPort(String attributeName, String root, boolean simpleName, Object val)
+  {
+    String contextClass = simpleName ? null: PortContext.class.getCanonicalName();
+
+    root += "inputport" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR;
+
+    return propertiesBuilder(attributeName, root, contextClass, val);
+  }
+
+  private Properties propertiesBuilderOutputPort(String attributeName, String root, boolean simpleName, Object val)
+  {
+    String contextClass = simpleName ? null: PortContext.class.getCanonicalName();
+
+    root += "outputport" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR;
+
+    return propertiesBuilder(attributeName, root, contextClass, val);
+  }
+
+  /**
+   * Note: If the same name is given to an Attribute specified in multiple Context classes, then the type of that
+   * Attribute is required to be the same accross all Context classes. This is required because if a simple attribute
+   * name is specified in a properties file at the top level context then that attribute needs to be set in all child configurations. If
+   * there were multiple Attributes specified in different Contexts with the same name, but a different type, then
+   * it would not be possible to set the values of Attributes specified by a simple attribute name in the root
+   * context of a properties file. If this were the case, then adding another Attribute with the same name as a pre-existing Attribute to a new Context
+   * class would be a backwards incompatible change.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testErrorSameAttrMultipleTypes()
+  {
+    //Trigger initialization of attributes for existing Contexts.
+    LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());
+
+    Exception ex = null;
+    try {
+      ContextUtils.buildAttributeMaps(Sets.newHashSet(MockContext1.class, MockContext2.class));
+    } catch (ValidationException e) {
+      ex = e;
+    }
+
+    Assert.assertNotNull(ex);
+    Assert.assertTrue(ex.getMessage().contains("is defined with two different types in two different context classes:"));
+
+    //Clear test data from Context.
+    ContextUtils.initialize();
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(LogicalPlanConfigurationTest.class);
+
+  public static class TestApplication implements StreamingApplication {
+    Integer testprop1;
+    Integer testprop2;
+    Integer testprop3;
+    TestInnerClass inncls;
+    public TestApplication() {
+      inncls=new TestInnerClass();
+    }
+
+    public Integer getTestprop1() {
+      return testprop1;
+    }
+
+    public void setTestprop1(Integer testprop1) {
+      this.testprop1 = testprop1;
+    }
+
+    public Integer getTestprop2() {
+      return testprop2;
+    }
+
+    public void setTestprop2(Integer testprop2) {
+      this.testprop2 = testprop2;
+    }
+
+    public Integer getTestprop3() {
+      return testprop3;
+    }
+
+    public void setTestprop3(Integer testprop3) {
+      this.testprop3 = testprop3;
+    }
+
+    public TestInnerClass getInncls() {
+      return inncls;
+    }
+
+    public void setInncls(TestInnerClass inncls) {
+      this.inncls = inncls;
+    }
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf) {
+
+    }
+    public class TestInnerClass{
+      Integer a;
+
+      public Integer getA() {
+        return a;
+      }
+
+      public void setA(Integer a) {
+        this.a = a;
+      }
+    }
+  }
+
+  public static class TestStatsListener implements StatsListener{
+
+    private int intProp;
+
+    public TestStatsListener()
+    {
+    }
+
+    @Override
+    public Response processStats(BatchedOperatorStats stats)
+    {
+      return null;
+    }
+
+    public int getIntProp()
+    {
+      return intProp;
+    }
+
+    public void setIntProp(int intProp)
+    {
+      this.intProp = intProp;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + intProp;
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      TestStatsListener other = (TestStatsListener) obj;
+      if (intProp != other.intProp)
+        return false;
+      return true;
+    }
+  }
+
+  public static class TestSchema
+  {
+  }
+
+  public static class SimpleTestApplication implements StreamingApplication
+  {
+    public final GenericTestOperator gt1 = new GenericTestOperator();
+    public final GenericTestOperator gt2 = new GenericTestOperator();
+    public final GenericTestOperator gt3 = new GenericTestOperator();
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      dag.addOperator("operator1", gt1);
+      dag.addOperator("operator2", gt2);
+      dag.addOperator("operator3", gt3);
+      dag.addStream("s1", gt1.outport1, gt2.inport1);
+      dag.addStream("s2", gt2.outport1, gt3.inport1, gt3.inport2);
+    }
+  };
+
+  public static interface MockContext1 extends Context
+  {
+    /**
+     * Number of tuples the poll buffer can cache without blocking the input stream to the port.
+     */
+    Attribute<Integer> TEST_ATTR = new Attribute<>(1024);
+
+    @SuppressWarnings("FieldNameHidesFieldInSuperclass")
+    long serialVersionUID = AttributeMap.AttributeInitializer.initialize(MockContext1.class);
+  }
+
+  public static interface MockContext2 extends Context
+  {
+    /**
+     * Number of tuples the poll buffer can cache without blocking the input stream to the port.
+     */
+    Attribute<Boolean> TEST_ATTR = new Attribute<>(false);
+
+    @SuppressWarnings("FieldNameHidesFieldInSuperclass")
+    long serialVersionUID = AttributeMap.AttributeInitializer.initialize(MockContext2.class);
+  }
+
+  @ApplicationAnnotation(name="SimpleTestApp")
+  public static class SimpleTestApplicationWithName extends SimpleTestApplication
+  {
+  };
+
+  private static final Logger LOG = LoggerFactory.getLogger(LogicalPlanConfigurationTest.class);
+}
+


Mime
View raw message