apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vro...@apache.org
Subject [38/50] [abbrv] incubator-apex-core git commit: APEX-28 #resolve
Date Thu, 24 Sep 2015 02:37:57 GMT
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/977093e1/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java
deleted file mode 100644
index 218156b..0000000
--- a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java
+++ /dev/null
@@ -1,1788 +0,0 @@
-/**
- * Copyright (C) 2015 DataTorrent, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.stram.plan;
-
-import com.datatorrent.api.*;
-import com.datatorrent.api.Attribute.AttributeMap.AttributeInitializer;
-import com.datatorrent.api.Context.DAGContext;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.Context.PortContext;
-import com.datatorrent.api.StringCodec.Integer2String;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.common.codec.JsonStreamCodec;
-import com.datatorrent.common.util.BasicContainerOptConfigurator;
-import com.datatorrent.common.util.FSStorageAgent;
-import com.datatorrent.stram.PartitioningTest.PartitionLoadWatch;
-import com.datatorrent.stram.client.StramClientUtils;
-import com.datatorrent.stram.engine.GenericTestOperator;
-import com.datatorrent.stram.engine.TestGeneratorInputOperator;
-import com.datatorrent.stram.plan.LogicalPlanTest.ValidationTestOperator;
-import com.datatorrent.stram.plan.logical.LogicalPlan;
-import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
-import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
-import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta;
-import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
-import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
-import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.AttributeParseUtils;
-import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.ConfElement;
-import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.ContextUtils;
-import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.StramElement;
-import com.datatorrent.stram.plan.logical.MockStorageAgent;
-import com.datatorrent.stram.support.StramTestSupport.RegexMatcher;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.lang.reflect.Field;
-import javax.validation.ValidationException;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.mutable.MutableBoolean;
-import org.apache.hadoop.conf.Configuration;
-import org.codehaus.jettison.json.JSONObject;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-import static org.junit.Assert.*;
-
-
-
-public class LogicalPlanConfigurationTest {
-
-  private static OperatorMeta assertNode(LogicalPlan dag, String id) {
-      OperatorMeta n = dag.getOperatorMeta(id);
-      assertNotNull("operator exists id=" + id, n);
-      return n;
-  }
-
-  /**
-   * Test read from dt-site.xml in Hadoop configuration format.
-   */
-  @Test
-  public void testLoadFromConfigXml() {
-    Configuration conf = new Configuration(false);
-    conf.addResource(StramClientUtils.DT_SITE_XML_FILE);
-
-    LogicalPlanConfiguration builder = new LogicalPlanConfiguration(conf);
-
-    LogicalPlan dag = new LogicalPlan();
-    builder.populateDAG(dag);
-    dag.validate();
-
-    assertEquals("number of operator confs", 6, dag.getAllOperators().size());
-
-    OperatorMeta operator1 = assertNode(dag, "operator1");
-    OperatorMeta operator2 = assertNode(dag, "operator2");
-    OperatorMeta operator3 = assertNode(dag, "operator3");
-    OperatorMeta operator4 = assertNode(dag, "operator4");
-
-    assertNotNull("operatorConf for root", operator1);
-    assertEquals("operatorId set", "operator1", operator1.getName());
-
-    // verify operator instantiation
-    assertEquals(operator1.getOperator().getClass(), TestGeneratorInputOperator.class);
-    TestGeneratorInputOperator GenericTestNode = (TestGeneratorInputOperator)operator1.getOperator();
-    assertEquals("myStringPropertyValue", GenericTestNode.getMyStringProperty());
-
-    // check links
-    assertEquals("operator1 inputs", 0, operator1.getInputStreams().size());
-    assertEquals("operator1 outputs", 1, operator1.getOutputStreams().size());
-    StreamMeta n1n2 = operator2.getInputStreams().get(operator2.getMeta(((GenericTestOperator)operator2.getOperator()).inport1));
-    assertNotNull("n1n2", n1n2);
-
-    // output/input stream object same
-    assertEquals("rootNode out is operator2 in", n1n2, operator1.getOutputStreams().get(operator1.getMeta(((TestGeneratorInputOperator)operator1.getOperator()).outport)));
-    assertEquals("n1n2 source", operator1, n1n2.getSource().getOperatorMeta());
-    Assert.assertEquals("n1n2 targets", 1, n1n2.getSinks().size());
-    Assert.assertEquals("n1n2 target", operator2, n1n2.getSinks().get(0).getOperatorWrapper());
-
-    assertEquals("stream name", "n1n2", n1n2.getName());
-    Assert.assertEquals("n1n2 not inline (default)", null, n1n2.getLocality());
-
-    // operator 2 streams to operator 3 and operator 4
-    assertEquals("operator 2 number of outputs", 1, operator2.getOutputStreams().size());
-    StreamMeta fromNode2 = operator2.getOutputStreams().values().iterator().next();
-
-    Set<OperatorMeta> targetNodes = Sets.newHashSet();
-    for (LogicalPlan.InputPortMeta ip : fromNode2.getSinks()) {
-      targetNodes.add(ip.getOperatorWrapper());
-    }
-    Assert.assertEquals("outputs " + fromNode2, Sets.newHashSet(operator3, operator4), targetNodes);
-
-    OperatorMeta operator6 = assertNode(dag, "operator6");
-
-    List<OperatorMeta> rootNodes = dag.getRootOperators();
-    assertEquals("number root operators", 2, rootNodes.size());
-    assertTrue("root operator2", rootNodes.contains(operator1));
-    assertTrue("root operator6", rootNodes.contains(operator6));
-
-    for (OperatorMeta n : rootNodes) {
-      printTopology(n, dag, 0);
-    }
-
-  }
-
-  private void printTopology(OperatorMeta operator, DAG tplg, int level) {
-      String prefix = "";
-      if (level > 0) {
-        prefix = StringUtils.repeat(" ", 20*(level-1)) + "   |" + StringUtils.repeat("-", 17);
-      }
-      logger.debug(prefix  + operator.getName());
-      for (StreamMeta downStream : operator.getOutputStreams().values()) {
-          if (!downStream.getSinks().isEmpty()) {
-            for (LogicalPlan.InputPortMeta targetNode : downStream.getSinks()) {
-              printTopology(targetNode.getOperatorWrapper(), tplg, level+1);
-            }
-          }
-      }
-  }
-
-  @Test
-  public void testLoadFromPropertiesFile() throws IOException
-  {
-      Properties props = new Properties();
-      String resourcePath = "/testTopology.properties";
-      InputStream is = this.getClass().getResourceAsStream(resourcePath);
-      if (is == null) {
-        fail("Could not load " + resourcePath);
-      }
-      props.load(is);
-      LogicalPlanConfiguration pb = new LogicalPlanConfiguration(new Configuration(false))
-            .addFromProperties(props, null);
-
-      LogicalPlan dag = new LogicalPlan();
-      pb.populateDAG(dag);
-      dag.validate();
-
-      assertEquals("number of operator confs", 5, dag.getAllOperators().size());
-      assertEquals("number of root operators", 1, dag.getRootOperators().size());
-
-      StreamMeta s1 = dag.getStream("n1n2");
-      assertNotNull(s1);
-      assertTrue("n1n2 inline", DAG.Locality.CONTAINER_LOCAL == s1.getLocality());
-
-      OperatorMeta operator3 = dag.getOperatorMeta("operator3");
-      assertEquals("operator3.classname", GenericTestOperator.class, operator3.getOperator().getClass());
-
-      GenericTestOperator doperator3 = (GenericTestOperator)operator3.getOperator();
-      assertEquals("myStringProperty " + doperator3, "myStringPropertyValueFromTemplate", doperator3.getMyStringProperty());
-      assertFalse("booleanProperty " + doperator3, doperator3.booleanProperty);
-
-      OperatorMeta operator4 = dag.getOperatorMeta("operator4");
-      GenericTestOperator doperator4 = (GenericTestOperator)operator4.getOperator();
-      assertEquals("myStringProperty " + doperator4, "overrideOperator4", doperator4.getMyStringProperty());
-      assertEquals("setterOnlyOperator4 " + doperator4, "setterOnlyOperator4", doperator4.propertySetterOnly);
-      assertTrue("booleanProperty " + doperator4, doperator4.booleanProperty);
-
-      StreamMeta input1 = dag.getStream("inputStream");
-      assertNotNull(input1);
-      Assert.assertEquals("input1 source", dag.getOperatorMeta("inputOperator"), input1.getSource().getOperatorMeta());
-      Set<OperatorMeta> targetNodes = Sets.newHashSet();
-      for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) {
-        targetNodes.add(targetPort.getOperatorWrapper());
-      }
-
-      Assert.assertEquals("input1 target ", Sets.newHashSet(dag.getOperatorMeta("operator1"), operator3, operator4), targetNodes);
-
-  }
-
-  @Test
-  public void testLoadFromJson() throws Exception
-  {
-    String resourcePath = "/testTopology.json";
-    InputStream is = this.getClass().getResourceAsStream(resourcePath);
-    if (is == null) {
-      fail("Could not load " + resourcePath);
-    }
-    StringWriter writer = new StringWriter();
-
-    IOUtils.copy(is, writer);
-    JSONObject json = new JSONObject(writer.toString());
-
-    Configuration conf = new Configuration(false);
-    conf.set(StreamingApplication.DT_PREFIX + "operator.operator3.prop.myStringProperty", "o3StringFromConf");
-
-    LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf);
-    LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson");
-    dag.validate();
-
-    assertEquals("DAG attribute CONTAINER_JVM_OPTIONS ", dag.getAttributes().get(DAGContext.CONTAINER_JVM_OPTIONS), "-Xmx16m");
-    Map<Class<?>, Class<? extends StringCodec<?>>> stringCodecsMap = Maps.newHashMap();
-    stringCodecsMap.put(Integer.class, Integer2String.class);
-    assertEquals("DAG attribute STRING_CODECS ", stringCodecsMap, dag.getAttributes().get(DAGContext.STRING_CODECS));
-    assertEquals("DAG attribute CONTAINER_OPTS_CONFIGURATOR ", BasicContainerOptConfigurator.class, dag.getAttributes().get(DAGContext.CONTAINER_OPTS_CONFIGURATOR).getClass());
-
-    assertEquals("number of operator confs", 5, dag.getAllOperators().size());
-    assertEquals("number of root operators", 1, dag.getRootOperators().size());
-
-    StreamMeta s1 = dag.getStream("n1n2");
-    assertNotNull(s1);
-    assertTrue("n1n2 inline", DAG.Locality.CONTAINER_LOCAL == s1.getLocality());
-
-    OperatorMeta input = dag.getOperatorMeta("inputOperator");
-    TestStatsListener tsl = new TestStatsListener();
-    tsl.setIntProp(222);
-    List<StatsListener> sll = Lists.<StatsListener>newArrayList(tsl);
-    assertEquals("inputOperator STATS_LISTENERS attribute ", sll, input.getAttributes().get(OperatorContext.STATS_LISTENERS));
-    for(OutputPortMeta opm : input.getOutputStreams().keySet()){
-      assertTrue("output port of input Operator attribute is JsonStreamCodec ", opm.getAttributes().get(PortContext.STREAM_CODEC) instanceof JsonStreamCodec<?>);
-    }
-
-    OperatorMeta operator3 = dag.getOperatorMeta("operator3");
-    assertEquals("operator3.classname", GenericTestOperator.class, operator3.getOperator().getClass());
-
-    GenericTestOperator doperator3 = (GenericTestOperator)operator3.getOperator();
-    assertEquals("myStringProperty " + doperator3, "o3StringFromConf", doperator3.getMyStringProperty());
-    assertFalse("booleanProperty " + doperator3, doperator3.booleanProperty);
-
-    OperatorMeta operator4 = dag.getOperatorMeta("operator4");
-    GenericTestOperator doperator4 = (GenericTestOperator)operator4.getOperator();
-    assertEquals("myStringProperty " + doperator4, "overrideOperator4", doperator4.getMyStringProperty());
-    assertEquals("setterOnlyOperator4 " + doperator4, "setterOnlyOperator4", doperator4.propertySetterOnly);
-    assertTrue("booleanProperty " + doperator4, doperator4.booleanProperty);
-
-    StreamMeta input1 = dag.getStream("inputStream");
-    assertNotNull(input1);
-    OperatorMeta inputOperator = dag.getOperatorMeta("inputOperator");
-    Assert.assertEquals("input1 source", inputOperator, input1.getSource().getOperatorMeta());
-    Set<OperatorMeta> targetNodes = Sets.newHashSet();
-    for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) {
-      targetNodes.add(targetPort.getOperatorWrapper());
-    }
-    Assert.assertEquals("operator attribute " + inputOperator, 64, (int)inputOperator.getValue(OperatorContext.MEMORY_MB));
-    Assert.assertEquals("port attribute " + inputOperator, 8, (int)input1.getSource().getValue(PortContext.UNIFIER_LIMIT));
-    Assert.assertEquals("input1 target ", Sets.newHashSet(dag.getOperatorMeta("operator1"), operator3, operator4), targetNodes);
-  }
-
-  @Test
-  @SuppressWarnings("UnnecessaryBoxing")
-  public void testAppLevelAttributes()
-  {
-    String appName = "app1";
-
-    Properties props = new Properties();
-    props.put(StreamingApplication.DT_PREFIX + DAG.MASTER_MEMORY_MB.getName(), "123");
-    props.put(StreamingApplication.DT_PREFIX + DAG.CONTAINER_JVM_OPTIONS.getName(), "-Dlog4j.properties=custom_log4j.properties");
-    props.put(StreamingApplication.DT_PREFIX + DAG.APPLICATION_PATH.getName(), "/defaultdir");
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + "." + DAG.APPLICATION_PATH.getName(), "/otherdir");
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + "." + DAG.STREAMING_WINDOW_SIZE_MILLIS.getName(), "1000");
-
-    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
-    dagBuilder.addFromProperties(props, null);
-
-    LogicalPlan dag = new LogicalPlan();
-
-    dagBuilder.populateDAG(dag);
-
-    dagBuilder.setApplicationConfiguration(dag, appName, null);
-
-    Assert.assertEquals("", "/otherdir", dag.getValue(DAG.APPLICATION_PATH));
-    Assert.assertEquals("", Integer.valueOf(123), dag.getValue(DAG.MASTER_MEMORY_MB));
-    Assert.assertEquals("", Integer.valueOf(1000), dag.getValue(DAG.STREAMING_WINDOW_SIZE_MILLIS));
-    Assert.assertEquals("", "-Dlog4j.properties=custom_log4j.properties", dag.getValue(DAG.CONTAINER_JVM_OPTIONS));
-
-  }
-  @Test
-  @SuppressWarnings("UnnecessaryBoxing")
-  public void testAppLevelProperties() {
-	  String appName ="app1";
-	  Properties props =new Properties();
-	  props.put(StreamingApplication.DT_PREFIX + "application."+appName+".testprop1","10");
-	  props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".prop.testprop2", "100");
-	  props.put(StreamingApplication.DT_PREFIX + "application.*.prop.testprop3","1000");
-	  props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".inncls.a", "10000");
-	  LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
-    dagBuilder.addFromProperties(props, null);
-
-	  LogicalPlan dag = new LogicalPlan();
-	  TestApplication app1Test=new TestApplication();
-
-	  dagBuilder.setApplicationConfiguration(dag, appName,app1Test);
-	  Assert.assertEquals("",Integer.valueOf(10),app1Test.getTestprop1());
-	  Assert.assertEquals("",Integer.valueOf(100),app1Test.getTestprop2());
-	  Assert.assertEquals("",Integer.valueOf(1000),app1Test.getTestprop3());
-	  Assert.assertEquals("",Integer.valueOf(10000),app1Test.getInncls().getA());
-  }
-
-  @Test
-  public void testPrepareDAG() {
-    final MutableBoolean appInitialized = new MutableBoolean(false);
-    StreamingApplication app = new StreamingApplication() {
-      @Override
-      public void populateDAG(DAG dag, Configuration conf)
-      {
-        Assert.assertEquals("", "hostname:9090", dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS));
-        dag.setAttribute(DAG.GATEWAY_CONNECT_ADDRESS, "hostname:9091");
-        appInitialized.setValue(true);
-      }
-    };
-    Configuration conf = new Configuration(false);
-    conf.addResource(StramClientUtils.DT_SITE_XML_FILE);
-    LogicalPlanConfiguration pb = new LogicalPlanConfiguration(conf);
-
-    LogicalPlan dag = new LogicalPlan();
-    pb.prepareDAG(dag, app, "testconfig");
-
-    Assert.assertTrue("populateDAG called", appInitialized.booleanValue());
-    Assert.assertEquals("populateDAG overrides attribute", "hostname:9091", dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS));
-  }
-
-  @Test
-  public void testOperatorConfigurationLookup() {
-
-    Properties props = new Properties();
-
-    // match operator by name
-    props.put(StreamingApplication.DT_PREFIX + "template.matchId1.matchIdRegExp", ".*operator1.*");
-    props.put(StreamingApplication.DT_PREFIX + "template.matchId1.stringProperty2", "stringProperty2Value-matchId1");
-    props.put(StreamingApplication.DT_PREFIX + "template.matchId1.nested.property", "nested.propertyValue-matchId1");
-
-    // match class name, lower priority
-    props.put(StreamingApplication.DT_PREFIX + "template.matchClass1.matchClassNameRegExp", ".*" + ValidationTestOperator.class.getSimpleName());
-    props.put(StreamingApplication.DT_PREFIX + "template.matchClass1.stringProperty2", "stringProperty2Value-matchClass1");
-
-    // match class name
-    props.put(StreamingApplication.DT_PREFIX + "template.t2.matchClassNameRegExp", ".*"+GenericTestOperator.class.getSimpleName());
-    props.put(StreamingApplication.DT_PREFIX + "template.t2.myStringProperty", "myStringPropertyValue");
-
-    // direct setting
-    props.put(StreamingApplication.DT_PREFIX + "operator.operator3.emitFormat", "emitFormatValue");
-
-    LogicalPlan dag = new LogicalPlan();
-    Operator operator1 = dag.addOperator("operator1", new ValidationTestOperator());
-    Operator operator2 = dag.addOperator("operator2", new ValidationTestOperator());
-    Operator operator3 = dag.addOperator("operator3", new GenericTestOperator());
-
-    LogicalPlanConfiguration pb = new LogicalPlanConfiguration(new Configuration(false));
-    LOG.debug("calling addFromProperties");
-    pb.addFromProperties(props, null);
-
-    Map<String, String> configProps = pb.getProperties(dag.getMeta(operator1), "appName");
-    Assert.assertEquals("" + configProps, 2, configProps.size());
-    Assert.assertEquals("" + configProps, "stringProperty2Value-matchId1", configProps.get("stringProperty2"));
-    Assert.assertEquals("" + configProps, "nested.propertyValue-matchId1", configProps.get("nested.property"));
-
-    configProps = pb.getProperties(dag.getMeta(operator2), "appName");
-    Assert.assertEquals("" + configProps, 1, configProps.size());
-    Assert.assertEquals("" + configProps, "stringProperty2Value-matchClass1", configProps.get("stringProperty2"));
-
-    configProps = pb.getProperties(dag.getMeta(operator3), "appName");
-    Assert.assertEquals("" + configProps, 2, configProps.size());
-    Assert.assertEquals("" + configProps, "myStringPropertyValue", configProps.get("myStringProperty"));
-    Assert.assertEquals("" + configProps, "emitFormatValue", configProps.get("emitFormat"));
-
-  }
-
-  @Test
-  public void testSetOperatorProperties() {
-
-    Configuration conf = new Configuration(false);
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.myStringProperty", "myStringPropertyValue");
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.stringArrayField", "a,b,c");
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty.key1", "key1Val");
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty(key1.dot)", "key1dotVal");
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty(key2.dot)", "key2dotVal");
-
-    LogicalPlan dag = new LogicalPlan();
-    GenericTestOperator o1 = dag.addOperator("o1", new GenericTestOperator());
-    ValidationTestOperator o2 = dag.addOperator("o2", new ValidationTestOperator());
-
-    LogicalPlanConfiguration pb = new LogicalPlanConfiguration(conf);
-
-    pb.setOperatorProperties(dag, "testSetOperatorProperties");
-    Assert.assertEquals("o1.myStringProperty", "myStringPropertyValue", o1.getMyStringProperty());
-    Assert.assertArrayEquals("o2.stringArrayField", new String[] {"a", "b", "c"}, o2.getStringArrayField());
-
-    Assert.assertEquals("o2.mapProperty.key1", "key1Val", o2.getMapProperty().get("key1"));
-    Assert.assertEquals("o2.mapProperty(key1.dot)", "key1dotVal", o2.getMapProperty().get("key1.dot"));
-    Assert.assertEquals("o2.mapProperty(key2.dot)", "key2dotVal", o2.getMapProperty().get("key2.dot"));
-
-  }
-
-  @ApplicationAnnotation(name="AnnotatedAlias")
-  class AnnotatedApplication implements StreamingApplication {
-
-    @Override
-    public void populateDAG(DAG dag, Configuration conf)
-    {
-    }
-
-  }
-
-  @Test
-  public void testAppNameAttribute() {
-    StreamingApplication app = new AnnotatedApplication();
-    Configuration conf = new Configuration(false);
-    conf.addResource(StramClientUtils.DT_SITE_XML_FILE);
-
-    LogicalPlanConfiguration builder = new LogicalPlanConfiguration(conf);
-
-    Properties properties = new Properties();
-    properties.put(StreamingApplication.DT_PREFIX + "application.TestAliasApp.class", app.getClass().getName());
-
-    builder.addFromProperties(properties, null);
-
-    LogicalPlan dag = new LogicalPlan();
-    String appPath = app.getClass().getName().replace(".", "/") + ".class";
-    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_NAME, "testApp");
-    builder.prepareDAG(dag, app, appPath);
-
-    Assert.assertEquals("Application name", "testApp", dag.getAttributes().get(com.datatorrent.api.Context.DAGContext.APPLICATION_NAME));
-  }
-
-  @Test
-  public void testAppAlias() {
-    StreamingApplication app = new AnnotatedApplication();
-    Configuration conf = new Configuration(false);
-    conf.addResource(StramClientUtils.DT_SITE_XML_FILE);
-
-    LogicalPlanConfiguration builder = new LogicalPlanConfiguration(conf);
-
-    Properties properties = new Properties();
-    properties.put(StreamingApplication.DT_PREFIX + "application.TestAliasApp.class", app.getClass().getName());
-
-    builder.addFromProperties(properties, null);
-
-    LogicalPlan dag = new LogicalPlan();
-    String appPath = app.getClass().getName().replace(".", "/") + ".class";
-    builder.prepareDAG(dag, app, appPath);
-
-    Assert.assertEquals("Application name", "TestAliasApp", dag.getAttributes().get(com.datatorrent.api.Context.DAGContext.APPLICATION_NAME));
-  }
-
-
-  @Test
-  public void testAppAnnotationAlias() {
-    StreamingApplication app = new AnnotatedApplication();
-    Configuration conf = new Configuration(false);
-    conf.addResource(StramClientUtils.DT_SITE_XML_FILE);
-
-    LogicalPlanConfiguration builder = new LogicalPlanConfiguration(conf);
-
-    LogicalPlan dag = new LogicalPlan();
-    String appPath = app.getClass().getName().replace(".", "/") + ".class";
-    builder.prepareDAG(dag, app, appPath);
-
-    Assert.assertEquals("Application name", "AnnotatedAlias", dag.getAttributes().get(com.datatorrent.api.Context.DAGContext.APPLICATION_NAME));
-  }
-
-  @Test
-  @SuppressWarnings( {"UnnecessaryBoxing", "AssertEqualsBetweenInconvertibleTypes"})
-  public void testOperatorLevelAttributes() {
-    String appName = "app1";
-    StreamingApplication app = new StreamingApplication() {
-      @Override
-      public void populateDAG(DAG dag, Configuration conf)
-      {
-        dag.addOperator("operator1", GenericTestOperator.class);
-        dag.addOperator("operator2", GenericTestOperator.class);
-      }
-    };
-
-    Properties props = new Properties();
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName());
-    props.put(StreamingApplication.DT_PREFIX + "operator.*." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "2");
-    props.put(StreamingApplication.DT_PREFIX + "operator.*." + OperatorContext.STATS_LISTENERS.getName(), PartitionLoadWatch.class.getName());
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "20");
-
-    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
-    dagBuilder.addFromProperties(props, null);
-
-    String appPath = app.getClass().getName().replace(".", "/") + ".class";
-
-    LogicalPlan dag = new LogicalPlan();
-    dagBuilder.prepareDAG(dag, app, appPath);
-
-    Assert.assertEquals("", Integer.valueOf(20), dag.getOperatorMeta("operator1").getValue(OperatorContext.APPLICATION_WINDOW_COUNT));
-    Assert.assertEquals("", Integer.valueOf(2), dag.getOperatorMeta("operator2").getValue(OperatorContext.APPLICATION_WINDOW_COUNT));
-    Assert.assertEquals("", PartitionLoadWatch.class, dag.getOperatorMeta("operator2").getValue(OperatorContext.STATS_LISTENERS).toArray()[0].getClass());
-  }
-
-  @Test
-  public void testOperatorLevelProperties() {
-    String appName = "app1";
-    final GenericTestOperator operator1 = new GenericTestOperator();
-    final GenericTestOperator operator2 = new GenericTestOperator();
-    StreamingApplication app = new StreamingApplication() {
-      @Override
-      public void populateDAG(DAG dag, Configuration conf)
-      {
-        dag.addOperator("operator1", operator1);
-        dag.addOperator("operator2", operator2);
-      }
-    };
-
-    Properties props = new Properties();
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName());
-    props.put(StreamingApplication.DT_PREFIX + "operator.*.myStringProperty", "pv1");
-    props.put(StreamingApplication.DT_PREFIX + "operator.*.booleanProperty", Boolean.TRUE.toString());
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.myStringProperty", "apv1");
-
-    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
-    dagBuilder.addFromProperties(props, null);
-
-    String appPath = app.getClass().getName().replace(".", "/") + ".class";
-
-    LogicalPlan dag = new LogicalPlan();
-    dagBuilder.prepareDAG(dag, app, appPath);
-
-    Assert.assertEquals("apv1", operator1.getMyStringProperty());
-    Assert.assertEquals("pv1", operator2.getMyStringProperty());
-    Assert.assertEquals(true, operator2.isBooleanProperty());
-  }
-
-  @Test
-  public void testApplicationLevelParameter()
-  {
-    String appName = "app1";
-    final GenericTestOperator operator1 = new GenericTestOperator();
-    final GenericTestOperator operator2 = new GenericTestOperator();
-    StreamingApplication app = new StreamingApplication()
-    {
-      @Override
-      public void populateDAG(DAG dag, Configuration conf)
-      {
-        dag.addOperator("operator1", operator1);
-        dag.addOperator("operator2", operator2);
-      }
-    };
-
-    Properties props = new Properties();
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName());
-    props.put(StreamingApplication.DT_PREFIX + "operator.*.myStringProperty", "foo ${xyz} bar ${zzz} baz");
-    props.put(StreamingApplication.DT_PREFIX + "operator.*.booleanProperty", Boolean.TRUE.toString());
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.myStringProperty", "apv1");
-
-    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
-
-    Configuration vars = new Configuration(false);
-    vars.set("xyz", "123");
-    vars.set("zzz", "456");
-    dagBuilder.addFromProperties(props, vars);
-
-    String appPath = app.getClass().getName().replace(".", "/") + ".class";
-
-    LogicalPlan dag = new LogicalPlan();
-    dagBuilder.prepareDAG(dag, app, appPath);
-
-    Assert.assertEquals("apv1", operator1.getMyStringProperty());
-    Assert.assertEquals("foo 123 bar 456 baz", operator2.getMyStringProperty());
-    Assert.assertEquals(true, operator2.isBooleanProperty());
-  }
-
-  @Test
-  @SuppressWarnings("UnnecessaryBoxing")
-  public void testPortLevelAttributes() {
-    String appName = "app1";
-    SimpleTestApplication app = new SimpleTestApplication();
-
-    Properties props = new Properties();
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName());
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.port.*." + PortContext.QUEUE_CAPACITY.getName(), "" + 16 * 1024);
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator2.inputport.inport1." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024);
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator2.outputport.outport1." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024);
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator3.port.*." + PortContext.QUEUE_CAPACITY.getName(), "" + 16 * 1024);
-    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator3.inputport.inport2." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024);
-
-    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
-    dagBuilder.addFromProperties(props, null);
-
-    String appPath = app.getClass().getName().replace(".", "/") + ".class";
-
-    LogicalPlan dag = new LogicalPlan();
-    dagBuilder.prepareDAG(dag, app, appPath);
-
-    OperatorMeta om1 = dag.getOperatorMeta("operator1");
-    Assert.assertEquals("", Integer.valueOf(16 * 1024), om1.getMeta(app.gt1.outport1).getValue(PortContext.QUEUE_CAPACITY));
-    OperatorMeta om2 = dag.getOperatorMeta("operator2");
-    Assert.assertEquals("", Integer.valueOf(32 * 1024), om2.getMeta(app.gt2.inport1).getValue(PortContext.QUEUE_CAPACITY));
-    Assert.assertEquals("", Integer.valueOf(32 * 1024), om2.getMeta(app.gt2.outport1).getValue(PortContext.QUEUE_CAPACITY));
-    OperatorMeta om3 = dag.getOperatorMeta("operator3");
-    Assert.assertEquals("", Integer.valueOf(16 * 1024), om3.getMeta(app.gt3.inport1).getValue(PortContext.QUEUE_CAPACITY));
-    Assert.assertEquals("", Integer.valueOf(32 * 1024), om3.getMeta(app.gt3.inport2).getValue(PortContext.QUEUE_CAPACITY));
-  }
-
-
-  @Test
-  public void testInvalidAttribute() throws Exception {
-    Assert.assertNotSame(0, com.datatorrent.api.Context.DAGContext.serialVersionUID);
-    Attribute<String> attribute = new Attribute<>("", null);
-
-    Field nameField = Attribute.class.getDeclaredField("name");
-    nameField.setAccessible(true);
-    nameField.set(attribute, "NOT_CONFIGURABLE");
-    nameField.setAccessible(false);
-
-    ContextUtils.addAttribute(com.datatorrent.api.Context.DAGContext.class, attribute);
-    AttributeParseUtils.initialize();
-    ConfElement.initialize();
-
-    // attribute that cannot be configured
-
-    Properties props = new Properties();
-    props.put(StreamingApplication.DT_PREFIX + "attr.NOT_CONFIGURABLE", "value");
-
-    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
-    dagBuilder.addFromProperties(props, null);
-
-    try {
-      dagBuilder.prepareDAG(new LogicalPlan(), null, "");
-      Assert.fail("Exception expected");
-    } catch (Exception e) {
-      Assert.assertThat("Attribute not configurable", e.getMessage(), RegexMatcher.matches("Attribute does not support property configuration: NOT_CONFIGURABLE.*"));
-    }
-
-    ContextUtils.removeAttribute(com.datatorrent.api.Context.DAGContext.class, attribute);
-    AttributeParseUtils.initialize();
-    ConfElement.initialize();
-
-    // invalid attribute name
-    props = new Properties();
-    String invalidAttribute = StreamingApplication.DT_PREFIX + "attr.INVALID_NAME";
-    props.put(invalidAttribute, "value");
-
-    try {
-      new LogicalPlanConfiguration(new Configuration(false)).addFromProperties(props, null);
-      Assert.fail("Exception expected");
-    } catch (Exception e) {
-      LOG.debug("Exception message: {}", e.getMessage());
-      Assert.assertThat("Invalid attribute name", e.getMessage(), RegexMatcher.matches("Invalid attribute reference: " + invalidAttribute));
-    }
-  }
-
-  @Test
-  public void testAttributesCodec() {
-    Assert.assertNotSame(null, new Long[] {com.datatorrent.api.Context.DAGContext.serialVersionUID, OperatorContext.serialVersionUID, PortContext.serialVersionUID});
-    @SuppressWarnings("unchecked")
-    Set<Class<? extends Context>> contextClasses = Sets.newHashSet(com.datatorrent.api.Context.DAGContext.class, OperatorContext.class, PortContext.class);
-    for (Class<?> c : contextClasses) {
-      for (Attribute<Object> attr : AttributeInitializer.getAttributes(c)) {
-        Assert.assertNotNull(attr.name + " codec", attr.codec);
-      }
-    }
-  }
-
-  @Test
-  public void testTupleClassAttr() throws Exception
-  {
-    String resourcePath = "/schemaTestTopology.json";
-    InputStream is = this.getClass().getResourceAsStream(resourcePath);
-    if (is == null) {
-      fail("Could not load " + resourcePath);
-    }
-    StringWriter writer = new StringWriter();
-
-    IOUtils.copy(is, writer);
-    JSONObject json = new JSONObject(writer.toString());
-
-    Configuration conf = new Configuration(false);
-
-    LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf);
-    LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson");
-    dag.validate();
-
-    OperatorMeta operator1 = dag.getOperatorMeta("operator1");
-    assertEquals("operator1.classname", SchemaTestOperator.class, operator1.getOperator().getClass());
-
-    StreamMeta input1 = dag.getStream("inputStream");
-    assertNotNull(input1);
-    for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) {
-      Assert.assertEquals("tuple class name required", TestSchema.class, targetPort.getAttributes().get(PortContext.TUPLE_CLASS));
-    }
-  }
-
-  @Test(expected = ValidationException.class)
-  public void testTupleClassAttrValidation() throws Exception
-  {
-    String resourcePath = "/schemaTestTopology.json";
-    InputStream is = this.getClass().getResourceAsStream(resourcePath);
-    if (is == null) {
-      fail("Could not load " + resourcePath);
-    }
-    StringWriter writer = new StringWriter();
-
-    IOUtils.copy(is, writer);
-    JSONObject json = new JSONObject(writer.toString());
-
-    //removing schema so that validation fails
-    json.getJSONArray("streams").getJSONObject(0).remove("schema");
-    Configuration conf = new Configuration(false);
-
-    LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf);
-    LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson");
-
-    dag.validate();
-  }
-
-  @Test
-  public void testTestTupleClassAttrSetFromConfig()
-  {
-    Configuration conf = new Configuration(false);
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.port.schemaRequiredPort.attr.TUPLE_CLASS",
-      "com.datatorrent.stram.plan.LogicalPlanConfigurationTest$TestSchema");
-
-    StreamingApplication streamingApplication = new StreamingApplication()
-    {
-      @Override
-      public void populateDAG(DAG dag, Configuration conf)
-      {
-        TestGeneratorInputOperator o1 = dag.addOperator("o1", new TestGeneratorInputOperator());
-        SchemaTestOperator o2 = dag.addOperator("o2", new SchemaTestOperator());
-        dag.addStream("stream", o1.outport, o2.schemaRequiredPort);
-      }
-    };
-    LogicalPlan dag = new LogicalPlan();
-    LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
-    lpc.prepareDAG(dag, streamingApplication, "app");
-    dag.validate();
-  }
-
-  /**
-   * This test and all of the following ambiguous attribute tests verify that when an ambiguous attribute
-   * name is provided, all the corresponding attributes are set.
-   * <br/><br/>
-   * <b>Note:</b> Ambiguous attribute means that when multiple attributes with the same
-   * simple name exist for multiple types of dag elements (like operators and ports).
-   * An example of such attributes are the com.datatorrent.api.Context.OperatorContext.AUTO_RECORD
-   * and com.datatorrent.api.Context.PortContext.AUTO_RECORD.
-   * <br/><br/>
-   * This test should set the attribute on the operators and ports.
-   */
-  /**
-   * This test should set the attribute on the operators and ports.
-   */
-  @Test
-  public void testRootLevelAmbiguousAttributeSimple()
-  {
-    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD,
-                                       Context.PortContext.AUTO_RECORD,
-                                       StreamingApplication.DT_PREFIX,
-                                       null,
-                                       Boolean.TRUE,
-                                       true,
-                                       true);
-  }
-
-  /**
-   * This test should set the attribute on the operators and ports.
-   */
-  @Test
-  public void testApplicationLevelAmbiguousAttributeSimple()
-  {
-    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD,
-                                       Context.PortContext.AUTO_RECORD,
-                                       StreamingApplication.DT_PREFIX
-                                       + "application"
-                                       + LogicalPlanConfiguration.KEY_SEPARATOR
-                                       + "*"
-                                       + LogicalPlanConfiguration.KEY_SEPARATOR,
-                                       null,
-                                       Boolean.TRUE,
-                                       true,
-                                       true);
-  }
-
-  /**
-   * This should only set the attribute on the operator
-   */
-  @Test
-  public void testOperatorLevelAmbiguousAttributeSimple()
-  {
-    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD,
-                                       Context.PortContext.AUTO_RECORD,
-                                       StreamingApplication.DT_PREFIX
-                                       + "operator"
-                                       + LogicalPlanConfiguration.KEY_SEPARATOR
-                                       + "*"
-                                       + LogicalPlanConfiguration.KEY_SEPARATOR,
-                                       null,
-                                       Boolean.TRUE,
-                                       true,
-                                       false);
-  }
-
-  /**
-   * This should only set the attribute on the port
-   */
-  @Test
-  public void testPortLevelAmbiguousAttributeSimple()
-  {
-    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD,
-                                       Context.PortContext.AUTO_RECORD,
-                                       StreamingApplication.DT_PREFIX
-                                       + "port"
-                                       + LogicalPlanConfiguration.KEY_SEPARATOR
-                                       + "*"
-                                       + LogicalPlanConfiguration.KEY_SEPARATOR,
-                                       null,
-                                       Boolean.TRUE,
-                                       false,
-                                       true);
-  }
-
-  /**
-   * This test should set the attribute on the operators and ports.
-   */
-  @Test
-  public void testRootLevelAmbiguousAttributeComplex()
-  {
-    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD,
-                                       Context.PortContext.AUTO_RECORD,
-                                       StreamingApplication.DT_PREFIX,
-                                       PortContext.class.getCanonicalName(),
-                                       Boolean.TRUE,
-                                       false,
-                                       true);
-  }
-
-  /**
-   * This test should set the attribute on the operators and ports.
-   */
-  @Test
-  public void testApplicationLevelAmbiguousAttributeComplex()
-  {
-    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD,
-                                       Context.PortContext.AUTO_RECORD,
-                                       StreamingApplication.DT_PREFIX
-                                       + "application"
-                                       + LogicalPlanConfiguration.KEY_SEPARATOR
-                                       + "*"
-                                       + LogicalPlanConfiguration.KEY_SEPARATOR,
-                                       PortContext.class.getCanonicalName(),
-                                       Boolean.TRUE,
-                                       false,
-                                       true);
-  }
-
-  /**
-   * This should only set the attribute on the operator
-   */
-  @Test
-  public void testOperatorLevelAmbiguousAttributeComplex()
-  {
-    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD,
-                                       Context.PortContext.AUTO_RECORD,
-                                       StreamingApplication.DT_PREFIX
-                                       + "operator"
-                                       + LogicalPlanConfiguration.KEY_SEPARATOR
-                                       + "*"
-                                       + LogicalPlanConfiguration.KEY_SEPARATOR,
-                                       OperatorContext.class.getCanonicalName(),
-                                       Boolean.TRUE,
-                                       true,
-                                       false);
-  }
-
-  /**
-   * This should only set the attribute on the port
-   */
-  @Test
-  public void testOperatorLevelAmbiguousAttributeComplex2()
-  {
-    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD,
-                                       Context.PortContext.AUTO_RECORD,
-                                       StreamingApplication.DT_PREFIX
-                                       + "operator"
-                                       + LogicalPlanConfiguration.KEY_SEPARATOR
-                                       + "*"
-                                       + LogicalPlanConfiguration.KEY_SEPARATOR,
-                                       PortContext.class.getCanonicalName(),
-                                       Boolean.TRUE,
-                                       false,
-                                       true);
-  }
-
-  /**
-   * This should only set the attribute on the port
-   */
-  @Test
-  public void testPortLevelAmbiguousAttributeComplex()
-  {
-    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD,
-                                       Context.PortContext.AUTO_RECORD,
-                                       StreamingApplication.DT_PREFIX
-                                       + "port"
-                                       + LogicalPlanConfiguration.KEY_SEPARATOR
-                                       + "*"
-                                       + LogicalPlanConfiguration.KEY_SEPARATOR,
-                                       PortContext.class.getCanonicalName(),
-                                       Boolean.TRUE,
-                                       false,
-                                       true);
-  }
-
-  private void testAttributeAmbiguousSimpleHelper(Attribute<?> attributeObjOperator,
-                                                  Attribute<?> attributeObjPort,
-                                                  String root,
-                                                  String contextClass,
-                                                  Object val,
-                                                  boolean operatorSet,
-                                                  boolean portSet)
-  {
-    Properties props = propertiesBuilder(attributeObjOperator.getSimpleName(),
-                                         root,
-                                         contextClass,
-                                         val);
-
-    simpleAttributeOperatorHelperAssert(attributeObjOperator,
-                                        props,
-                                        val,
-                                        operatorSet);
-
-    simpleNamePortAssertHelperAssert(attributeObjPort,
-                                     props,
-                                     val,
-                                     portSet);
-  }
-
-  @Test
-  public void testRootLevelAttributeSimpleNameOperator()
-  {
-    simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB,
-                                  StreamingApplication.DT_PREFIX,
-                                  true,
-                                  (Integer)4096,
-                                  true,
-                                  true);
-  }
-
-  @Test
-  public void testRootLevelStorageAgentSimpleNameOperator()
-  {
-    MockStorageAgent mockAgent = new MockStorageAgent();
-
-    simpleAttributeOperatorHelper(OperatorContext.STORAGE_AGENT,
-                                  StreamingApplication.DT_PREFIX,
-                                  true,
-                                  mockAgent,
-                                  true,
-                                  false);
-  }
-
-  @Test
-  public void testRootLevelAttributeSimpleNameOperatorNoScope()
-  {
-    simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB,
-                                  StreamingApplication.DT_PREFIX,
-                                  true,
-                                  (Integer)4096,
-                                  true,
-                                  false);
-  }
-
-  @Test
-  public void testApplicationLevelAttributeSimpleNameOperator()
-  {
-    simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB,
-                                  StreamingApplication.DT_PREFIX
-                                  + "application"
-                                  + LogicalPlanConfiguration.KEY_SEPARATOR
-                                  + "SimpleTestApp"
-                                  + LogicalPlanConfiguration.KEY_SEPARATOR,
-                                  true,
-                                  (Integer)4096,
-                                  true,
-                                  true);
-  }
-
-  private void simpleAttributeOperatorHelper(Attribute<?> attributeObj,
-                                             String root,
-                                             boolean simpleName,
-                                             Object val,
-                                             boolean set,
-                                             boolean scope)
-  {
-    Properties props = propertiesBuilderOperator(attributeObj.getSimpleName(),
-                                                 root,
-                                                 simpleName,
-                                                 val,
-                                                 scope);
-
-    simpleAttributeOperatorHelperAssert(attributeObj,
-                                        props,
-                                        val,
-                                        set);
-  }
-
-  private void simpleAttributeOperatorHelperAssert(Attribute<?> attributeObj,
-                                                   Properties props,
-                                                   Object val,
-                                                   boolean set)
-  {
-    SimpleTestApplicationWithName app = new SimpleTestApplicationWithName();
-
-    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
-    dagBuilder.addFromProperties(props, null);
-
-    String appPath = app.getClass().getName().replace(".", "/") + ".class";
-
-    LogicalPlan dag = new LogicalPlan();
-    dagBuilder.prepareDAG(dag, app, appPath);
-
-    OperatorMeta om1 = dag.getOperatorMeta("operator1");
-
-    if (set) {
-      Assert.assertEquals(val, om1.getValue(attributeObj));
-    } else {
-      Assert.assertNotEquals(val, om1.getValue(attributeObj));
-    }
-
-    OperatorMeta om2 = dag.getOperatorMeta("operator2");
-
-    if (set) {
-      Assert.assertEquals(val, om2.getValue(attributeObj));
-    } else {
-      Assert.assertNotEquals(val, om2.getValue(attributeObj));
-    }
-
-    OperatorMeta om3 = dag.getOperatorMeta("operator3");
-
-    if (set) {
-      Assert.assertEquals(val, om3.getValue(attributeObj));
-    } else {
-      Assert.assertNotEquals(val, om3.getValue(attributeObj));
-    }
-  }
-
-  /* Port tests */
-  @Test
-  public void testRootLevelAttributeSimpleNamePort()
-  {
-    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY,
-                              StreamingApplication.DT_PREFIX,
-                              true,
-                              (Integer)4096,
-                              true,
-                              true);
-  }
-
-  @Test
-  public void testRootLevelAttributeSimpleNamePortNoScope()
-  {
-    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY,
-                              StreamingApplication.DT_PREFIX,
-                              true,
-                              (Integer)4096,
-                              true,
-                              false);
-  }
-
-  @Test
-  public void testOperatorLevelAttributeSimpleNamePort()
-  {
-    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY,
-                              StreamingApplication.DT_PREFIX
-                              + "operator"
-                              + LogicalPlanConfiguration.KEY_SEPARATOR
-                              + "*"
-                              + LogicalPlanConfiguration.KEY_SEPARATOR,
-                              true,
-                              (Integer)4096,
-                              true,
-                              true);
-  }
-
-  @Test
-  public void testApplicationLevelAttributeSimpleNamePort()
-  {
-    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY,
-                              StreamingApplication.DT_PREFIX
-                              + "application"
-                              + LogicalPlanConfiguration.KEY_SEPARATOR
-                              + "SimpleTestApp"
-                              + LogicalPlanConfiguration.KEY_SEPARATOR,
-                              true,
-                              (Integer)4096,
-                              true,
-                              true);
-  }
-
-  @Test
-  public void testRootLevelAttributeComplexNamePort()
-  {
-    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY,
-                              StreamingApplication.DT_PREFIX,
-                              false,
-                              (Integer)4096,
-                              true,
-                              true);
-  }
-
-  @Test
-  public void testRootLevelAttributeComplexNamePortNoScope()
-  {
-    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY,
-                              StreamingApplication.DT_PREFIX,
-                              false,
-                              (Integer)4096,
-                              true,
-                              false);
-  }
-
-  @Test
-  public void testOperatorLevelAttributeComplexNamePort()
-  {
-    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY,
-                              StreamingApplication.DT_PREFIX
-                              + "operator"
-                              + LogicalPlanConfiguration.KEY_SEPARATOR
-                              + "*"
-                              + LogicalPlanConfiguration.KEY_SEPARATOR,
-                              false,
-                              (Integer)4096,
-                              true,
-                              true);
-  }
-
-  @Test
-  public void testApplicationLevelAttributeComplexNamePort()
-  {
-    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY,
-                              StreamingApplication.DT_PREFIX
-                              + "application"
-                              + LogicalPlanConfiguration.KEY_SEPARATOR
-                              + "SimpleTestApp"
-                              + LogicalPlanConfiguration.KEY_SEPARATOR,
-                              false,
-                              (Integer)4096,
-                              true,
-                              true);
-  }
-
-  /* Input port tests */
-  @Test
-  public void testRootLevelAttributeSimpleNameInputPort()
-  {
-    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY,
-                                   StreamingApplication.DT_PREFIX,
-                                   true,
-                                   (Integer)4096,
-                                   true);
-  }
-
-  @Test
-  public void testOperatorLevelAttributeSimpleNameInputPort()
-  {
-    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY,
-                                   StreamingApplication.DT_PREFIX
-                                   + "operator"
-                                   + LogicalPlanConfiguration.KEY_SEPARATOR
-                                   + "*"
-                                   + LogicalPlanConfiguration.KEY_SEPARATOR,
-                                   true,
-                                   (Integer)4096,
-                                   true);
-  }
-
-  @Test
-  public void testApplicationLevelAttributeSimpleNameInputPort()
-  {
-    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY,
-                                   StreamingApplication.DT_PREFIX
-                                   + "application"
-                                   + LogicalPlanConfiguration.KEY_SEPARATOR
-                                   + "SimpleTestApp"
-                                   + LogicalPlanConfiguration.KEY_SEPARATOR,
-                                   true,
-                                   (Integer)4096,
-                                   true);
-  }
-
-  @Test
-  public void testRootLevelAttributeComplexNameInputPort()
-  {
-    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY,
-                                   StreamingApplication.DT_PREFIX,
-                                   false,
-                                   (Integer)4096,
-                                   true);
-  }
-
-  @Test
-  public void testOperatorLevelAttributeComplexNameInputPort()
-  {
-    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY,
-                                   StreamingApplication.DT_PREFIX
-                                   + "operator"
-                                   + LogicalPlanConfiguration.KEY_SEPARATOR
-                                   + "*"
-                                   + LogicalPlanConfiguration.KEY_SEPARATOR,
-                                   false,
-                                   (Integer)4096,
-                                   true);
-  }
-
-  @Test
-  public void testApplicationLevelAttributeComplexNameInputPort()
-  {
-    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY,
-                                   StreamingApplication.DT_PREFIX
-                                   + "application"
-                                   + LogicalPlanConfiguration.KEY_SEPARATOR
-                                   + "SimpleTestApp"
-                                   + LogicalPlanConfiguration.KEY_SEPARATOR,
-                                   false,
-                                   (Integer)4096,
-                                   true);
-  }
-
-  /* Output port tests */
-  @Test
-  public void testRootLevelAttributeSimpleNameOutputPort()
-  {
-    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY,
-                                    StreamingApplication.DT_PREFIX,
-                                    true,
-                                    (Integer)4096,
-                                    true);
-  }
-
-  @Test
-  public void testOperatorLevelAttributeSimpleNameOutputPort()
-  {
-    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY,
-                                    StreamingApplication.DT_PREFIX
-                                    + "operator"
-                                    + LogicalPlanConfiguration.KEY_SEPARATOR
-                                    + "*"
-                                    + LogicalPlanConfiguration.KEY_SEPARATOR,
-                                    true,
-                                    (Integer)4096,
-                                    true);
-  }
-
-  @Test
-  public void testApplicationLevelAttributeSimpleNameOutputPort()
-  {
-    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY,
-                                    StreamingApplication.DT_PREFIX
-                                    + "application"
-                                    + LogicalPlanConfiguration.KEY_SEPARATOR
-                                    + "SimpleTestApp"
-                                    + LogicalPlanConfiguration.KEY_SEPARATOR,
-                                    true,
-                                    (Integer)4096,
-                                    true);
-  }
-
-  @Test
-  public void testRootLevelAttributeComplexNameOutputPort()
-  {
-    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY,
-                                    StreamingApplication.DT_PREFIX,
-                                    false,
-                                    (Integer)4096,
-                                    true);
-  }
-
-  @Test
-  public void testOperatorLevelAttributeComplexNameOutputPort()
-  {
-    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY,
-                                    StreamingApplication.DT_PREFIX
-                                    + "operator"
-                                    + LogicalPlanConfiguration.KEY_SEPARATOR
-                                    + "*"
-                                    + LogicalPlanConfiguration.KEY_SEPARATOR,
-                                    false,
-                                    (Integer)4096,
-                                    true);
-  }
-
-  @Test
-  public void testApplicationLevelAttributeComplexNameOutputPort()
-  {
-    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY,
-                                    StreamingApplication.DT_PREFIX
-                                    + "application"
-                                    + LogicalPlanConfiguration.KEY_SEPARATOR
-                                    + "SimpleTestApp"
-                                    + LogicalPlanConfiguration.KEY_SEPARATOR,
-                                    false,
-                                    (Integer)4096,
-                                    true);
-  }
-
-  /* Helpers for building ports */
-  private void simpleAttributePortHelper(Attribute<?> attributeObj,
-                                         String root,
-                                         boolean simpleName,
-                                         Object val,
-                                         boolean set,
-                                         boolean scope)
-  {
-    Properties props = propertiesBuilderPort(attributeObj.getSimpleName(),
-                                             root,
-                                             simpleName,
-                                             val,
-                                             scope);
-
-    simpleNamePortAssertHelperAssert(attributeObj,
-                                     props,
-                                     val,
-                                     set);
-  }
-
-  private void simpleAttributeInputPortHelper(Attribute<?> attributeObj,
-                                              String root,
-                                              boolean simpleName,
-                                              Object val,
-                                              boolean set)
-  {
-    Properties props = propertiesBuilderInputPort(attributeObj.getSimpleName(),
-                                                  root,
-                                                  simpleName,
-                                                  val);
-
-    simpleNameInputPortAssertHelperAssert(attributeObj,
-                                          props,
-                                          val,
-                                          set);
-
-    simpleNameOutputPortAssertHelperAssert(attributeObj,
-                                           props,
-                                           val,
-                                           !set);
-  }
-
-  private void simpleAttributeOutputPortHelper(Attribute<?> attributeObj,
-                                               String root,
-                                               boolean simpleName,
-                                               Object val,
-                                               boolean set)
-  {
-    Properties props = propertiesBuilderOutputPort(attributeObj.getSimpleName(),
-                                                   root,
-                                                   simpleName,
-                                                   val);
-
-    simpleNameOutputPortAssertHelperAssert(attributeObj,
-                                           props,
-                                           val,
-                                           set);
-
-    simpleNameInputPortAssertHelperAssert(attributeObj,
-                                          props,
-                                          val,
-                                          !set);
-  }
-
-  private void simpleNamePortAssertHelperAssert(Attribute<?> attributeObj,
-                                                Properties props,
-                                                Object val,
-                                                boolean set)
-  {
-    SimpleTestApplicationWithName app = new SimpleTestApplicationWithName();
-
-    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
-    dagBuilder.addFromProperties(props, null);
-
-    String appPath = app.getClass().getName().replace(".", "/") + ".class";
-
-    LogicalPlan dag = new LogicalPlan();
-    dagBuilder.prepareDAG(dag, app, appPath);
-
-    simpleNamePortAssertHelper(attributeObj,
-                               dag,
-                               "operator1",
-                               val,
-                               set);
-
-    simpleNamePortAssertHelper(attributeObj,
-                               dag,
-                               "operator2",
-                               val,
-                               set);
-
-    simpleNamePortAssertHelper(attributeObj,
-                               dag,
-                               "operator3",
-                               val,
-                               set);
-  }
-
-  private void simpleNameInputPortAssertHelperAssert(Attribute<?> attributeObj,
-                                                     Properties props,
-                                                     Object val,
-                                                     boolean set)
-  {
-    SimpleTestApplicationWithName app = new SimpleTestApplicationWithName();
-
-    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
-    dagBuilder.addFromProperties(props, null);
-
-    String appPath = app.getClass().getName().replace(".", "/") + ".class";
-
-    LogicalPlan dag = new LogicalPlan();
-    dagBuilder.prepareDAG(dag, app, appPath);
-
-    simpleNameInputPortAssertHelper(attributeObj,
-                                    dag,
-                                    "operator1",
-                                    val,
-                                    set);
-
-    simpleNameInputPortAssertHelper(attributeObj,
-                                    dag,
-                                    "operator2",
-                                    val,
-                                    set);
-
-    simpleNameInputPortAssertHelper(attributeObj,
-                                    dag,
-                                    "operator3",
-                                    val,
-                                    set);
-  }
-
-  private void simpleNameOutputPortAssertHelperAssert(Attribute<?> attributeObj,
-                                                      Properties props,
-                                                      Object val,
-                                                      boolean set)
-  {
-    SimpleTestApplicationWithName app = new SimpleTestApplicationWithName();
-
-    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
-    dagBuilder.addFromProperties(props, null);
-
-    String appPath = app.getClass().getName().replace(".", "/") + ".class";
-
-    LogicalPlan dag = new LogicalPlan();
-    dagBuilder.prepareDAG(dag, app, appPath);
-
-    simpleNameOutputPortAssertHelper(attributeObj,
-                                     dag,
-                                     "operator1",
-                                     val,
-                                     set);
-
-    simpleNameOutputPortAssertHelper(attributeObj,
-                                     dag,
-                                     "operator2",
-                                     val,
-                                     set);
-
-    simpleNameOutputPortAssertHelper(attributeObj,
-                                     dag,
-                                     "operator3",
-                                     val,
-                                     set);
-  }
-
-  private void simpleNamePortAssertHelper(Attribute<?> attributeObj,
-                                          LogicalPlan dag,
-                                          String operatorName,
-                                          Object queueCapacity,
-                                          boolean set)
-  {
-    simpleNameInputPortAssertHelper(attributeObj,
-                                    dag,
-                                    operatorName,
-                                    queueCapacity,
-                                    set);
-
-    simpleNameOutputPortAssertHelper(attributeObj,
-                                     dag,
-                                     operatorName,
-                                     queueCapacity,
-                                     set);
-  }
-
-  private void simpleNameInputPortAssertHelper(Attribute<?> attributeObj,
-                                               LogicalPlan dag,
-                                               String operatorName,
-                                               Object queueCapacity,
-                                               boolean set)
-  {
-    OperatorMeta operatorMeta = dag.getOperatorMeta(operatorName);
-
-    for (InputPortMeta inputPortMeta: operatorMeta.getInputStreams().keySet()) {
-      if (set) {
-        Assert.assertEquals(queueCapacity, inputPortMeta.getValue(attributeObj));
-      } else {
-        Assert.assertNotEquals(queueCapacity, inputPortMeta.getValue(attributeObj));
-      }
-    }
-  }
-
-  private void simpleNameOutputPortAssertHelper(Attribute<?> attributeObj,
-                                                LogicalPlan dag,
-                                                String operatorName,
-                                                Object queueCapacity,
-                                                boolean set)
-  {
-    OperatorMeta operatorMeta = dag.getOperatorMeta(operatorName);
-
-    for (OutputPortMeta outputPortMeta: operatorMeta.getOutputStreams().keySet()) {
-      if (set) {
-        Assert.assertEquals(queueCapacity, outputPortMeta.getValue(attributeObj));
-      } else {
-        Assert.assertNotEquals(queueCapacity, outputPortMeta.getValue(attributeObj));
-      }
-    }
-  }
-
-  /* Helpers for building properties */
-  private Properties propertiesBuilder(String attributeName,
-                                       String root,
-                                       String contextClass,
-                                       Object val)
-  {
-    boolean simpleName = contextClass == null;
-
-    if (!simpleName) {
-      attributeName = contextClass
-                      + LogicalPlanConfiguration.KEY_SEPARATOR
-                      + attributeName;
-    }
-
-    Properties props = new Properties();
-
-    String propName = root
-                      + StramElement.ATTR.getValue()
-                      + LogicalPlanConfiguration.KEY_SEPARATOR
-                      + attributeName;
-
-    LOG.debug("adding prop {} with value {}", propName, val.toString());
-
-    props.put(propName,
-              val.toString());
-
-    return props;
-  }
-
-  private Properties propertiesBuilderOperator(String attributeName,
-                                               String root,
-                                               boolean simpleName,
-                                               Object val,
-                                               boolean addOperator)
-  {
-    String contextClass = simpleName ? null : OperatorContext.class.getCanonicalName();
-
-    if (addOperator) {
-      root += "operator"
-              + LogicalPlanConfiguration.KEY_SEPARATOR
-              + "*"
-              + LogicalPlanConfiguration.KEY_SEPARATOR;
-    }
-
-    return propertiesBuilder(attributeName,
-                             root,
-                             contextClass,
-                             val);
-  }
-
-  private Properties propertiesBuilderPort(String attributeName,
-                                           String root,
-                                           boolean simpleName,
-                                           Object val,
-                                           boolean addPort)
-  {
-    String contextClass = simpleName ? null : PortContext.class.getCanonicalName();
-
-    if (addPort) {
-      root += "port"
-              + LogicalPlanConfiguration.KEY_SEPARATOR
-              + "*"
-              + LogicalPlanConfiguration.KEY_SEPARATOR;
-    }
-
-    return propertiesBuilder(attributeName,
-                             root,
-                             contextClass,
-                             val);
-  }
-
-  private Properties propertiesBuilderInputPort(String attributeName,
-                                                String root,
-                                                boolean simpleName,
-                                                Object val)
-  {
-    String contextClass = simpleName ? null: PortContext.class.getCanonicalName();
-
-    root += "inputport" +
-            LogicalPlanConfiguration.KEY_SEPARATOR +
-            "*" +
-            LogicalPlanConfiguration.KEY_SEPARATOR;
-
-    return propertiesBuilder(attributeName,
-                             root,
-                             contextClass,
-                             val);
-  }
-
-  private Properties propertiesBuilderOutputPort(String attributeName,
-                                                 String root,
-                                                 boolean simpleName,
-                                                 Object val)
-  {
-    String contextClass = simpleName ? null: PortContext.class.getCanonicalName();
-
-    root += "outputport" +
-            LogicalPlanConfiguration.KEY_SEPARATOR +
-            "*" +
-            LogicalPlanConfiguration.KEY_SEPARATOR;
-
-    return propertiesBuilder(attributeName,
-                             root,
-                             contextClass,
-                             val);
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(LogicalPlanConfigurationTest.class);
-
-  public static class TestApplication implements StreamingApplication {
-    Integer testprop1;
-    Integer testprop2;
-    Integer testprop3;
-    TestInnerClass inncls;
-    public TestApplication() {
-      inncls=new TestInnerClass();
-    }
-
-    public Integer getTestprop1() {
-      return testprop1;
-    }
-
-    public void setTestprop1(Integer testprop1) {
-      this.testprop1 = testprop1;
-    }
-
-    public Integer getTestprop2() {
-      return testprop2;
-    }
-
-    public void setTestprop2(Integer testprop2) {
-      this.testprop2 = testprop2;
-    }
-
-    public Integer getTestprop3() {
-      return testprop3;
-    }
-
-    public void setTestprop3(Integer testprop3) {
-      this.testprop3 = testprop3;
-    }
-
-    public TestInnerClass getInncls() {
-      return inncls;
-    }
-
-    public void setInncls(TestInnerClass inncls) {
-      this.inncls = inncls;
-    }
-
-    @Override
-    public void populateDAG(DAG dag, Configuration conf) {
-
-    }
-    public class TestInnerClass{
-      Integer a;
-
-      public Integer getA() {
-        return a;
-      }
-
-      public void setA(Integer a) {
-        this.a = a;
-      }
-    }
-  }
-
-  public static class TestStatsListener implements StatsListener{
-
-    private int intProp;
-
-    public TestStatsListener()
-    {
-    }
-
-    @Override
-    public Response processStats(BatchedOperatorStats stats)
-    {
-      return null;
-    }
-
-    public int getIntProp()
-    {
-      return intProp;
-    }
-
-    public void setIntProp(int intProp)
-    {
-      this.intProp = intProp;
-    }
-
-    @Override
-    public int hashCode()
-    {
-      final int prime = 31;
-      int result = 1;
-      result = prime * result + intProp;
-      return result;
-    }
-
-    @Override
-    public boolean equals(Object obj)
-    {
-      if (this == obj)
-        return true;
-      if (obj == null)
-        return false;
-      if (getClass() != obj.getClass())
-        return false;
-      TestStatsListener other = (TestStatsListener) obj;
-      if (intProp != other.intProp)
-        return false;
-      return true;
-    }
-  }
-
-  public static class TestSchema
-  {
-  }
-
-  public static class SimpleTestApplication implements StreamingApplication
-  {
-    public final GenericTestOperator gt1 = new GenericTestOperator();
-    public final GenericTestOperator gt2 = new GenericTestOperator();
-    public final GenericTestOperator gt3 = new GenericTestOperator();
-
-    @Override
-    public void populateDAG(DAG dag, Configuration conf)
-    {
-      dag.addOperator("operator1", gt1);
-      dag.addOperator("operator2", gt2);
-      dag.addOperator("operator3", gt3);
-      dag.addStream("s1", gt1.outport1, gt2.inport1);
-      dag.addStream("s2", gt2.outport1, gt3.inport1, gt3.inport2);
-    }
-  };
-
-  @ApplicationAnnotation(name="SimpleTestApp")
-  public static class SimpleTestApplicationWithName extends SimpleTestApplication
-  {
-  };
-
-  private static final Logger LOG = LoggerFactory.getLogger(LogicalPlanConfigurationTest.class);
-}
-


Mime
View raw message