apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From csi...@apache.org
Subject [1/8] incubator-apex-core git commit: APEX-28 #resolve
Date Thu, 17 Sep 2015 04:58:54 GMT
Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 7503dde51 -> 454feccac


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/434a7170/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java
index 3b6bdd1..218156b 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java
@@ -15,28 +15,6 @@
  */
 package com.datatorrent.stram.plan;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.lang.reflect.Field;
-import java.util.*;
-
-import javax.validation.ValidationException;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import org.codehaus.jettison.json.JSONObject;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.mutable.MutableBoolean;
-import org.apache.hadoop.conf.Configuration;
-
 import com.datatorrent.api.*;
 import com.datatorrent.api.Attribute.AttributeMap.AttributeInitializer;
 import com.datatorrent.api.Context.DAGContext;
@@ -46,20 +24,48 @@ import com.datatorrent.api.StringCodec.Integer2String;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.common.codec.JsonStreamCodec;
 import com.datatorrent.common.util.BasicContainerOptConfigurator;
+import com.datatorrent.common.util.FSStorageAgent;
 import com.datatorrent.stram.PartitioningTest.PartitionLoadWatch;
 import com.datatorrent.stram.client.StramClientUtils;
 import com.datatorrent.stram.engine.GenericTestOperator;
 import com.datatorrent.stram.engine.TestGeneratorInputOperator;
 import com.datatorrent.stram.plan.LogicalPlanTest.ValidationTestOperator;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.AttributeParseUtils;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.ConfElement;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.ContextUtils;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.StramElement;
+import com.datatorrent.stram.plan.logical.MockStorageAgent;
 import com.datatorrent.stram.support.StramTestSupport.RegexMatcher;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.lang.reflect.Field;
+import javax.validation.ValidationException;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.hadoop.conf.Configuration;
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
 
 import static org.junit.Assert.*;
 
+
+
 public class LogicalPlanConfigurationTest {
 
   private static OperatorMeta assertNode(LogicalPlan dag, String id) {
@@ -75,7 +81,6 @@ public class LogicalPlanConfigurationTest {
   public void testLoadFromConfigXml() {
     Configuration conf = new Configuration(false);
     conf.addResource(StramClientUtils.DT_SITE_XML_FILE);
-    //Configuration.dumpConfiguration(conf, new PrintWriter(System.out));
 
     LogicalPlanConfiguration builder = new LogicalPlanConfiguration(conf);
 
@@ -117,7 +122,7 @@ public class LogicalPlanConfigurationTest {
     assertEquals("operator 2 number of outputs", 1, operator2.getOutputStreams().size());
     StreamMeta fromNode2 = operator2.getOutputStreams().values().iterator().next();
 
-    Set<OperatorMeta> targetNodes = new HashSet<OperatorMeta>();
+    Set<OperatorMeta> targetNodes = Sets.newHashSet();
     for (LogicalPlan.InputPortMeta ip : fromNode2.getSinks()) {
       targetNodes.add(ip.getOperatorWrapper());
     }
@@ -191,7 +196,7 @@ public class LogicalPlanConfigurationTest {
       StreamMeta input1 = dag.getStream("inputStream");
       assertNotNull(input1);
       Assert.assertEquals("input1 source", dag.getOperatorMeta("inputOperator"), input1.getSource().getOperatorMeta());
-      Set<OperatorMeta> targetNodes = new HashSet<OperatorMeta>();
+      Set<OperatorMeta> targetNodes = Sets.newHashSet();
       for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) {
         targetNodes.add(targetPort.getOperatorWrapper());
       }
@@ -221,11 +226,11 @@ public class LogicalPlanConfigurationTest {
     dag.validate();
 
     assertEquals("DAG attribute CONTAINER_JVM_OPTIONS ", dag.getAttributes().get(DAGContext.CONTAINER_JVM_OPTIONS), "-Xmx16m");
-    Map<Class<?>, Class<? extends StringCodec<?>>> stringCodecsMap = new HashMap<Class<?>, Class<? extends StringCodec<?>>>();
+    Map<Class<?>, Class<? extends StringCodec<?>>> stringCodecsMap = Maps.newHashMap();
     stringCodecsMap.put(Integer.class, Integer2String.class);
     assertEquals("DAG attribute STRING_CODECS ", stringCodecsMap, dag.getAttributes().get(DAGContext.STRING_CODECS));
     assertEquals("DAG attribute CONTAINER_OPTS_CONFIGURATOR ", BasicContainerOptConfigurator.class, dag.getAttributes().get(DAGContext.CONTAINER_OPTS_CONFIGURATOR).getClass());
-    
+
     assertEquals("number of operator confs", 5, dag.getAllOperators().size());
     assertEquals("number of root operators", 1, dag.getRootOperators().size());
 
@@ -241,7 +246,7 @@ public class LogicalPlanConfigurationTest {
     for(OutputPortMeta opm : input.getOutputStreams().keySet()){
       assertTrue("output port of input Operator attribute is JsonStreamCodec ", opm.getAttributes().get(PortContext.STREAM_CODEC) instanceof JsonStreamCodec<?>);
     }
-    
+
     OperatorMeta operator3 = dag.getOperatorMeta("operator3");
     assertEquals("operator3.classname", GenericTestOperator.class, operator3.getOperator().getClass());
 
@@ -259,7 +264,7 @@ public class LogicalPlanConfigurationTest {
     assertNotNull(input1);
     OperatorMeta inputOperator = dag.getOperatorMeta("inputOperator");
     Assert.assertEquals("input1 source", inputOperator, input1.getSource().getOperatorMeta());
-    Set<OperatorMeta> targetNodes = new HashSet<OperatorMeta>();
+    Set<OperatorMeta> targetNodes = Sets.newHashSet();
     for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) {
       targetNodes.add(targetPort.getOperatorWrapper());
     }
@@ -269,6 +274,7 @@ public class LogicalPlanConfigurationTest {
   }
 
   @Test
+  @SuppressWarnings("UnnecessaryBoxing")
   public void testAppLevelAttributes()
   {
     String appName = "app1";
@@ -296,6 +302,7 @@ public class LogicalPlanConfigurationTest {
 
   }
   @Test
+  @SuppressWarnings("UnnecessaryBoxing")
   public void testAppLevelProperties() {
 	  String appName ="app1";
 	  Properties props =new Properties();
@@ -315,6 +322,7 @@ public class LogicalPlanConfigurationTest {
 	  Assert.assertEquals("",Integer.valueOf(1000),app1Test.getTestprop3());
 	  Assert.assertEquals("",Integer.valueOf(10000),app1Test.getInncls().getA());
   }
+
   @Test
   public void testPrepareDAG() {
     final MutableBoolean appInitialized = new MutableBoolean(false);
@@ -365,6 +373,7 @@ public class LogicalPlanConfigurationTest {
     Operator operator3 = dag.addOperator("operator3", new GenericTestOperator());
 
     LogicalPlanConfiguration pb = new LogicalPlanConfiguration(new Configuration(false));
+    LOG.debug("calling addFromProperties");
     pb.addFromProperties(props, null);
 
     Map<String, String> configProps = pb.getProperties(dag.getMeta(operator1), "appName");
@@ -415,7 +424,6 @@ public class LogicalPlanConfigurationTest {
     @Override
     public void populateDAG(DAG dag, Configuration conf)
     {
-      //dag.setAttribute(DAGContext.APPLICATION_NAME, "testApp");
     }
 
   }
@@ -478,6 +486,7 @@ public class LogicalPlanConfigurationTest {
   }
 
   @Test
+  @SuppressWarnings( {"UnnecessaryBoxing", "AssertEqualsBetweenInconvertibleTypes"})
   public void testOperatorLevelAttributes() {
     String appName = "app1";
     StreamingApplication app = new StreamingApplication() {
@@ -581,22 +590,10 @@ public class LogicalPlanConfigurationTest {
   }
 
   @Test
+  @SuppressWarnings("UnnecessaryBoxing")
   public void testPortLevelAttributes() {
     String appName = "app1";
-    final GenericTestOperator gt1 = new GenericTestOperator();
-    final GenericTestOperator gt2 = new GenericTestOperator();
-    final GenericTestOperator gt3 = new GenericTestOperator();
-    StreamingApplication app = new StreamingApplication() {
-      @Override
-      public void populateDAG(DAG dag, Configuration conf)
-      {
-        dag.addOperator("operator1", gt1);
-        dag.addOperator("operator2", gt2);
-        dag.addOperator("operator3", gt3);
-        dag.addStream("s1", gt1.outport1, gt2.inport1);
-        dag.addStream("s2", gt2.outport1, gt3.inport1, gt3.inport2);
-      }
-    };
+    SimpleTestApplication app = new SimpleTestApplication();
 
     Properties props = new Properties();
     props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName());
@@ -613,33 +610,31 @@ public class LogicalPlanConfigurationTest {
 
     LogicalPlan dag = new LogicalPlan();
     dagBuilder.prepareDAG(dag, app, appPath);
-    //dagBuilder.populateDAG(dag, new Configuration(false));
 
-    //dagBuilder.setApplicationConfiguration(dag, appName);
     OperatorMeta om1 = dag.getOperatorMeta("operator1");
-    Assert.assertEquals("", Integer.valueOf(16 * 1024), om1.getMeta(gt1.outport1).getValue(PortContext.QUEUE_CAPACITY));
+    Assert.assertEquals("", Integer.valueOf(16 * 1024), om1.getMeta(app.gt1.outport1).getValue(PortContext.QUEUE_CAPACITY));
     OperatorMeta om2 = dag.getOperatorMeta("operator2");
-    Assert.assertEquals("", Integer.valueOf(32 * 1024), om2.getMeta(gt2.inport1).getValue(PortContext.QUEUE_CAPACITY));
-    Assert.assertEquals("", Integer.valueOf(32 * 1024), om2.getMeta(gt2.outport1).getValue(PortContext.QUEUE_CAPACITY));
+    Assert.assertEquals("", Integer.valueOf(32 * 1024), om2.getMeta(app.gt2.inport1).getValue(PortContext.QUEUE_CAPACITY));
+    Assert.assertEquals("", Integer.valueOf(32 * 1024), om2.getMeta(app.gt2.outport1).getValue(PortContext.QUEUE_CAPACITY));
     OperatorMeta om3 = dag.getOperatorMeta("operator3");
-    Assert.assertEquals("", Integer.valueOf(16 * 1024), om3.getMeta(gt3.inport1).getValue(PortContext.QUEUE_CAPACITY));
-    Assert.assertEquals("", Integer.valueOf(32 * 1024), om3.getMeta(gt3.inport2).getValue(PortContext.QUEUE_CAPACITY));
+    Assert.assertEquals("", Integer.valueOf(16 * 1024), om3.getMeta(app.gt3.inport1).getValue(PortContext.QUEUE_CAPACITY));
+    Assert.assertEquals("", Integer.valueOf(32 * 1024), om3.getMeta(app.gt3.inport2).getValue(PortContext.QUEUE_CAPACITY));
   }
 
 
   @Test
   public void testInvalidAttribute() throws Exception {
-
     Assert.assertNotSame(0, com.datatorrent.api.Context.DAGContext.serialVersionUID);
-    Set<Attribute<Object>> appAttributes = AttributeInitializer.getAttributes(com.datatorrent.api.Context.DAGContext.class);
-    Attribute<Object> attribute = new Attribute<Object>("", null);
+    Attribute<String> attribute = new Attribute<>("", null);
 
     Field nameField = Attribute.class.getDeclaredField("name");
     nameField.setAccessible(true);
     nameField.set(attribute, "NOT_CONFIGURABLE");
     nameField.setAccessible(false);
 
-    appAttributes.add(attribute);
+    ContextUtils.addAttribute(com.datatorrent.api.Context.DAGContext.class, attribute);
+    AttributeParseUtils.initialize();
+    ConfElement.initialize();
 
     // attribute that cannot be configured
 
@@ -656,7 +651,9 @@ public class LogicalPlanConfigurationTest {
       Assert.assertThat("Attribute not configurable", e.getMessage(), RegexMatcher.matches("Attribute does not support property configuration: NOT_CONFIGURABLE.*"));
     }
 
-    appAttributes.remove(attribute);
+    ContextUtils.removeAttribute(com.datatorrent.api.Context.DAGContext.class, attribute);
+    AttributeParseUtils.initialize();
+    ConfElement.initialize();
 
     // invalid attribute name
     props = new Properties();
@@ -667,9 +664,9 @@ public class LogicalPlanConfigurationTest {
       new LogicalPlanConfiguration(new Configuration(false)).addFromProperties(props, null);
       Assert.fail("Exception expected");
     } catch (Exception e) {
+      LOG.debug("Exception message: {}", e.getMessage());
       Assert.assertThat("Invalid attribute name", e.getMessage(), RegexMatcher.matches("Invalid attribute reference: " + invalidAttribute));
     }
-
   }
 
   @Test
@@ -759,6 +756,898 @@ public class LogicalPlanConfigurationTest {
     dag.validate();
   }
 
+  /**
+   * This test and all of the following ambiguous attribute tests verify that when an ambiguous attribute
+   * name is provided, all the corresponding attributes are set.
+   * <br/><br/>
+   * <b>Note:</b> Ambiguous attribute means that when multiple attributes with the same
+   * simple name exist for multiple types of dag elements (like operators and ports).
+   * An example of such attributes are the com.datatorrent.api.Context.OperatorContext.AUTO_RECORD
+   * and com.datatorrent.api.Context.PortContext.AUTO_RECORD.
+   * <br/><br/>
+   * This test should set the attribute on the operators and ports.
+   */
+  /**
+   * This test should set the attribute on the operators and ports.
+   */
+  @Test
+  public void testRootLevelAmbiguousAttributeSimple()
+  {
+    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD,
+                                       Context.PortContext.AUTO_RECORD,
+                                       StreamingApplication.DT_PREFIX,
+                                       null,
+                                       Boolean.TRUE,
+                                       true,
+                                       true);
+  }
+
+  /**
+   * This test should set the attribute on the operators and ports.
+   */
+  @Test
+  public void testApplicationLevelAmbiguousAttributeSimple()
+  {
+    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD,
+                                       Context.PortContext.AUTO_RECORD,
+                                       StreamingApplication.DT_PREFIX
+                                       + "application"
+                                       + LogicalPlanConfiguration.KEY_SEPARATOR
+                                       + "*"
+                                       + LogicalPlanConfiguration.KEY_SEPARATOR,
+                                       null,
+                                       Boolean.TRUE,
+                                       true,
+                                       true);
+  }
+
+  /**
+   * This should only set the attribute on the operator
+   */
+  @Test
+  public void testOperatorLevelAmbiguousAttributeSimple()
+  {
+    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD,
+                                       Context.PortContext.AUTO_RECORD,
+                                       StreamingApplication.DT_PREFIX
+                                       + "operator"
+                                       + LogicalPlanConfiguration.KEY_SEPARATOR
+                                       + "*"
+                                       + LogicalPlanConfiguration.KEY_SEPARATOR,
+                                       null,
+                                       Boolean.TRUE,
+                                       true,
+                                       false);
+  }
+
+  /**
+   * This should only set the attribute on the port
+   */
+  @Test
+  public void testPortLevelAmbiguousAttributeSimple()
+  {
+    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD,
+                                       Context.PortContext.AUTO_RECORD,
+                                       StreamingApplication.DT_PREFIX
+                                       + "port"
+                                       + LogicalPlanConfiguration.KEY_SEPARATOR
+                                       + "*"
+                                       + LogicalPlanConfiguration.KEY_SEPARATOR,
+                                       null,
+                                       Boolean.TRUE,
+                                       false,
+                                       true);
+  }
+
+  /**
+   * This test should set the attribute on the operators and ports.
+   */
+  @Test
+  public void testRootLevelAmbiguousAttributeComplex()
+  {
+    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD,
+                                       Context.PortContext.AUTO_RECORD,
+                                       StreamingApplication.DT_PREFIX,
+                                       PortContext.class.getCanonicalName(),
+                                       Boolean.TRUE,
+                                       false,
+                                       true);
+  }
+
+  /**
+   * This test should set the attribute on the operators and ports.
+   */
+  @Test
+  public void testApplicationLevelAmbiguousAttributeComplex()
+  {
+    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD,
+                                       Context.PortContext.AUTO_RECORD,
+                                       StreamingApplication.DT_PREFIX
+                                       + "application"
+                                       + LogicalPlanConfiguration.KEY_SEPARATOR
+                                       + "*"
+                                       + LogicalPlanConfiguration.KEY_SEPARATOR,
+                                       PortContext.class.getCanonicalName(),
+                                       Boolean.TRUE,
+                                       false,
+                                       true);
+  }
+
+  /**
+   * This should only set the attribute on the operator
+   */
+  @Test
+  public void testOperatorLevelAmbiguousAttributeComplex()
+  {
+    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD,
+                                       Context.PortContext.AUTO_RECORD,
+                                       StreamingApplication.DT_PREFIX
+                                       + "operator"
+                                       + LogicalPlanConfiguration.KEY_SEPARATOR
+                                       + "*"
+                                       + LogicalPlanConfiguration.KEY_SEPARATOR,
+                                       OperatorContext.class.getCanonicalName(),
+                                       Boolean.TRUE,
+                                       true,
+                                       false);
+  }
+
+  /**
+   * This should only set the attribute on the port
+   */
+  @Test
+  public void testOperatorLevelAmbiguousAttributeComplex2()
+  {
+    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD,
+                                       Context.PortContext.AUTO_RECORD,
+                                       StreamingApplication.DT_PREFIX
+                                       + "operator"
+                                       + LogicalPlanConfiguration.KEY_SEPARATOR
+                                       + "*"
+                                       + LogicalPlanConfiguration.KEY_SEPARATOR,
+                                       PortContext.class.getCanonicalName(),
+                                       Boolean.TRUE,
+                                       false,
+                                       true);
+  }
+
+  /**
+   * This should only set the attribute on the port
+   */
+  @Test
+  public void testPortLevelAmbiguousAttributeComplex()
+  {
+    testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD,
+                                       Context.PortContext.AUTO_RECORD,
+                                       StreamingApplication.DT_PREFIX
+                                       + "port"
+                                       + LogicalPlanConfiguration.KEY_SEPARATOR
+                                       + "*"
+                                       + LogicalPlanConfiguration.KEY_SEPARATOR,
+                                       PortContext.class.getCanonicalName(),
+                                       Boolean.TRUE,
+                                       false,
+                                       true);
+  }
+
+  private void testAttributeAmbiguousSimpleHelper(Attribute<?> attributeObjOperator,
+                                                  Attribute<?> attributeObjPort,
+                                                  String root,
+                                                  String contextClass,
+                                                  Object val,
+                                                  boolean operatorSet,
+                                                  boolean portSet)
+  {
+    Properties props = propertiesBuilder(attributeObjOperator.getSimpleName(),
+                                         root,
+                                         contextClass,
+                                         val);
+
+    simpleAttributeOperatorHelperAssert(attributeObjOperator,
+                                        props,
+                                        val,
+                                        operatorSet);
+
+    simpleNamePortAssertHelperAssert(attributeObjPort,
+                                     props,
+                                     val,
+                                     portSet);
+  }
+
+  @Test
+  public void testRootLevelAttributeSimpleNameOperator()
+  {
+    simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB,
+                                  StreamingApplication.DT_PREFIX,
+                                  true,
+                                  (Integer)4096,
+                                  true,
+                                  true);
+  }
+
+  @Test
+  public void testRootLevelStorageAgentSimpleNameOperator()
+  {
+    MockStorageAgent mockAgent = new MockStorageAgent();
+
+    simpleAttributeOperatorHelper(OperatorContext.STORAGE_AGENT,
+                                  StreamingApplication.DT_PREFIX,
+                                  true,
+                                  mockAgent,
+                                  true,
+                                  false);
+  }
+
+  @Test
+  public void testRootLevelAttributeSimpleNameOperatorNoScope()
+  {
+    simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB,
+                                  StreamingApplication.DT_PREFIX,
+                                  true,
+                                  (Integer)4096,
+                                  true,
+                                  false);
+  }
+
+  @Test
+  public void testApplicationLevelAttributeSimpleNameOperator()
+  {
+    simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB,
+                                  StreamingApplication.DT_PREFIX
+                                  + "application"
+                                  + LogicalPlanConfiguration.KEY_SEPARATOR
+                                  + "SimpleTestApp"
+                                  + LogicalPlanConfiguration.KEY_SEPARATOR,
+                                  true,
+                                  (Integer)4096,
+                                  true,
+                                  true);
+  }
+
+  private void simpleAttributeOperatorHelper(Attribute<?> attributeObj,
+                                             String root,
+                                             boolean simpleName,
+                                             Object val,
+                                             boolean set,
+                                             boolean scope)
+  {
+    Properties props = propertiesBuilderOperator(attributeObj.getSimpleName(),
+                                                 root,
+                                                 simpleName,
+                                                 val,
+                                                 scope);
+
+    simpleAttributeOperatorHelperAssert(attributeObj,
+                                        props,
+                                        val,
+                                        set);
+  }
+
+  private void simpleAttributeOperatorHelperAssert(Attribute<?> attributeObj,
+                                                   Properties props,
+                                                   Object val,
+                                                   boolean set)
+  {
+    SimpleTestApplicationWithName app = new SimpleTestApplicationWithName();
+
+    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
+    dagBuilder.addFromProperties(props, null);
+
+    String appPath = app.getClass().getName().replace(".", "/") + ".class";
+
+    LogicalPlan dag = new LogicalPlan();
+    dagBuilder.prepareDAG(dag, app, appPath);
+
+    OperatorMeta om1 = dag.getOperatorMeta("operator1");
+
+    if (set) {
+      Assert.assertEquals(val, om1.getValue(attributeObj));
+    } else {
+      Assert.assertNotEquals(val, om1.getValue(attributeObj));
+    }
+
+    OperatorMeta om2 = dag.getOperatorMeta("operator2");
+
+    if (set) {
+      Assert.assertEquals(val, om2.getValue(attributeObj));
+    } else {
+      Assert.assertNotEquals(val, om2.getValue(attributeObj));
+    }
+
+    OperatorMeta om3 = dag.getOperatorMeta("operator3");
+
+    if (set) {
+      Assert.assertEquals(val, om3.getValue(attributeObj));
+    } else {
+      Assert.assertNotEquals(val, om3.getValue(attributeObj));
+    }
+  }
+
+  /* Port tests */
+  @Test
+  public void testRootLevelAttributeSimpleNamePort()
+  {
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY,
+                              StreamingApplication.DT_PREFIX,
+                              true,
+                              (Integer)4096,
+                              true,
+                              true);
+  }
+
+  @Test
+  public void testRootLevelAttributeSimpleNamePortNoScope()
+  {
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY,
+                              StreamingApplication.DT_PREFIX,
+                              true,
+                              (Integer)4096,
+                              true,
+                              false);
+  }
+
+  @Test
+  public void testOperatorLevelAttributeSimpleNamePort()
+  {
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY,
+                              StreamingApplication.DT_PREFIX
+                              + "operator"
+                              + LogicalPlanConfiguration.KEY_SEPARATOR
+                              + "*"
+                              + LogicalPlanConfiguration.KEY_SEPARATOR,
+                              true,
+                              (Integer)4096,
+                              true,
+                              true);
+  }
+
+  @Test
+  public void testApplicationLevelAttributeSimpleNamePort()
+  {
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY,
+                              StreamingApplication.DT_PREFIX
+                              + "application"
+                              + LogicalPlanConfiguration.KEY_SEPARATOR
+                              + "SimpleTestApp"
+                              + LogicalPlanConfiguration.KEY_SEPARATOR,
+                              true,
+                              (Integer)4096,
+                              true,
+                              true);
+  }
+
+  @Test
+  public void testRootLevelAttributeComplexNamePort()
+  {
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY,
+                              StreamingApplication.DT_PREFIX,
+                              false,
+                              (Integer)4096,
+                              true,
+                              true);
+  }
+
+  @Test
+  public void testRootLevelAttributeComplexNamePortNoScope()
+  {
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY,
+                              StreamingApplication.DT_PREFIX,
+                              false,
+                              (Integer)4096,
+                              true,
+                              false);
+  }
+
+  @Test
+  public void testOperatorLevelAttributeComplexNamePort()
+  {
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY,
+                              StreamingApplication.DT_PREFIX
+                              + "operator"
+                              + LogicalPlanConfiguration.KEY_SEPARATOR
+                              + "*"
+                              + LogicalPlanConfiguration.KEY_SEPARATOR,
+                              false,
+                              (Integer)4096,
+                              true,
+                              true);
+  }
+
+  @Test
+  public void testApplicationLevelAttributeComplexNamePort()
+  {
+    simpleAttributePortHelper(PortContext.QUEUE_CAPACITY,
+                              StreamingApplication.DT_PREFIX
+                              + "application"
+                              + LogicalPlanConfiguration.KEY_SEPARATOR
+                              + "SimpleTestApp"
+                              + LogicalPlanConfiguration.KEY_SEPARATOR,
+                              false,
+                              (Integer)4096,
+                              true,
+                              true);
+  }
+
+  /* Input port tests */
+  @Test
+  public void testRootLevelAttributeSimpleNameInputPort()
+  {
+    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY,
+                                   StreamingApplication.DT_PREFIX,
+                                   true,
+                                   (Integer)4096,
+                                   true);
+  }
+
+  @Test
+  public void testOperatorLevelAttributeSimpleNameInputPort()
+  {
+    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY,
+                                   StreamingApplication.DT_PREFIX
+                                   + "operator"
+                                   + LogicalPlanConfiguration.KEY_SEPARATOR
+                                   + "*"
+                                   + LogicalPlanConfiguration.KEY_SEPARATOR,
+                                   true,
+                                   (Integer)4096,
+                                   true);
+  }
+
+  @Test
+  public void testApplicationLevelAttributeSimpleNameInputPort()
+  {
+    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY,
+                                   StreamingApplication.DT_PREFIX
+                                   + "application"
+                                   + LogicalPlanConfiguration.KEY_SEPARATOR
+                                   + "SimpleTestApp"
+                                   + LogicalPlanConfiguration.KEY_SEPARATOR,
+                                   true,
+                                   (Integer)4096,
+                                   true);
+  }
+
+  @Test
+  public void testRootLevelAttributeComplexNameInputPort()
+  {
+    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY,
+                                   StreamingApplication.DT_PREFIX,
+                                   false,
+                                   (Integer)4096,
+                                   true);
+  }
+
+  @Test
+  public void testOperatorLevelAttributeComplexNameInputPort()
+  {
+    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY,
+                                   StreamingApplication.DT_PREFIX
+                                   + "operator"
+                                   + LogicalPlanConfiguration.KEY_SEPARATOR
+                                   + "*"
+                                   + LogicalPlanConfiguration.KEY_SEPARATOR,
+                                   false,
+                                   (Integer)4096,
+                                   true);
+  }
+
+  @Test
+  public void testApplicationLevelAttributeComplexNameInputPort()
+  {
+    simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY,
+                                   StreamingApplication.DT_PREFIX
+                                   + "application"
+                                   + LogicalPlanConfiguration.KEY_SEPARATOR
+                                   + "SimpleTestApp"
+                                   + LogicalPlanConfiguration.KEY_SEPARATOR,
+                                   false,
+                                   (Integer)4096,
+                                   true);
+  }
+
+  /* Output port tests */
+  @Test
+  public void testRootLevelAttributeSimpleNameOutputPort()
+  {
+    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY,
+                                    StreamingApplication.DT_PREFIX,
+                                    true,
+                                    (Integer)4096,
+                                    true);
+  }
+
+  @Test
+  public void testOperatorLevelAttributeSimpleNameOutputPort()
+  {
+    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY,
+                                    StreamingApplication.DT_PREFIX
+                                    + "operator"
+                                    + LogicalPlanConfiguration.KEY_SEPARATOR
+                                    + "*"
+                                    + LogicalPlanConfiguration.KEY_SEPARATOR,
+                                    true,
+                                    (Integer)4096,
+                                    true);
+  }
+
+  @Test
+  public void testApplicationLevelAttributeSimpleNameOutputPort()
+  {
+    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY,
+                                    StreamingApplication.DT_PREFIX
+                                    + "application"
+                                    + LogicalPlanConfiguration.KEY_SEPARATOR
+                                    + "SimpleTestApp"
+                                    + LogicalPlanConfiguration.KEY_SEPARATOR,
+                                    true,
+                                    (Integer)4096,
+                                    true);
+  }
+
+  @Test
+  public void testRootLevelAttributeComplexNameOutputPort()
+  {
+    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY,
+                                    StreamingApplication.DT_PREFIX,
+                                    false,
+                                    (Integer)4096,
+                                    true);
+  }
+
+  @Test
+  public void testOperatorLevelAttributeComplexNameOutputPort()
+  {
+    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY,
+                                    StreamingApplication.DT_PREFIX
+                                    + "operator"
+                                    + LogicalPlanConfiguration.KEY_SEPARATOR
+                                    + "*"
+                                    + LogicalPlanConfiguration.KEY_SEPARATOR,
+                                    false,
+                                    (Integer)4096,
+                                    true);
+  }
+
+  @Test
+  public void testApplicationLevelAttributeComplexNameOutputPort()
+  {
+    simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY,
+                                    StreamingApplication.DT_PREFIX
+                                    + "application"
+                                    + LogicalPlanConfiguration.KEY_SEPARATOR
+                                    + "SimpleTestApp"
+                                    + LogicalPlanConfiguration.KEY_SEPARATOR,
+                                    false,
+                                    (Integer)4096,
+                                    true);
+  }
+
+  /* Helpers for building ports */
+  private void simpleAttributePortHelper(Attribute<?> attributeObj,
+                                         String root,
+                                         boolean simpleName,
+                                         Object val,
+                                         boolean set,
+                                         boolean scope)
+  {
+    Properties props = propertiesBuilderPort(attributeObj.getSimpleName(),
+                                             root,
+                                             simpleName,
+                                             val,
+                                             scope);
+
+    simpleNamePortAssertHelperAssert(attributeObj,
+                                     props,
+                                     val,
+                                     set);
+  }
+
+  private void simpleAttributeInputPortHelper(Attribute<?> attributeObj,
+                                              String root,
+                                              boolean simpleName,
+                                              Object val,
+                                              boolean set)
+  {
+    Properties props = propertiesBuilderInputPort(attributeObj.getSimpleName(),
+                                                  root,
+                                                  simpleName,
+                                                  val);
+
+    simpleNameInputPortAssertHelperAssert(attributeObj,
+                                          props,
+                                          val,
+                                          set);
+
+    simpleNameOutputPortAssertHelperAssert(attributeObj,
+                                           props,
+                                           val,
+                                           !set);
+  }
+
+  private void simpleAttributeOutputPortHelper(Attribute<?> attributeObj,
+                                               String root,
+                                               boolean simpleName,
+                                               Object val,
+                                               boolean set)
+  {
+    Properties props = propertiesBuilderOutputPort(attributeObj.getSimpleName(),
+                                                   root,
+                                                   simpleName,
+                                                   val);
+
+    simpleNameOutputPortAssertHelperAssert(attributeObj,
+                                           props,
+                                           val,
+                                           set);
+
+    simpleNameInputPortAssertHelperAssert(attributeObj,
+                                          props,
+                                          val,
+                                          !set);
+  }
+
+  private void simpleNamePortAssertHelperAssert(Attribute<?> attributeObj,
+                                                Properties props,
+                                                Object val,
+                                                boolean set)
+  {
+    SimpleTestApplicationWithName app = new SimpleTestApplicationWithName();
+
+    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
+    dagBuilder.addFromProperties(props, null);
+
+    String appPath = app.getClass().getName().replace(".", "/") + ".class";
+
+    LogicalPlan dag = new LogicalPlan();
+    dagBuilder.prepareDAG(dag, app, appPath);
+
+    simpleNamePortAssertHelper(attributeObj,
+                               dag,
+                               "operator1",
+                               val,
+                               set);
+
+    simpleNamePortAssertHelper(attributeObj,
+                               dag,
+                               "operator2",
+                               val,
+                               set);
+
+    simpleNamePortAssertHelper(attributeObj,
+                               dag,
+                               "operator3",
+                               val,
+                               set);
+  }
+
+  private void simpleNameInputPortAssertHelperAssert(Attribute<?> attributeObj,
+                                                     Properties props,
+                                                     Object val,
+                                                     boolean set)
+  {
+    SimpleTestApplicationWithName app = new SimpleTestApplicationWithName();
+
+    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
+    dagBuilder.addFromProperties(props, null);
+
+    String appPath = app.getClass().getName().replace(".", "/") + ".class";
+
+    LogicalPlan dag = new LogicalPlan();
+    dagBuilder.prepareDAG(dag, app, appPath);
+
+    simpleNameInputPortAssertHelper(attributeObj,
+                                    dag,
+                                    "operator1",
+                                    val,
+                                    set);
+
+    simpleNameInputPortAssertHelper(attributeObj,
+                                    dag,
+                                    "operator2",
+                                    val,
+                                    set);
+
+    simpleNameInputPortAssertHelper(attributeObj,
+                                    dag,
+                                    "operator3",
+                                    val,
+                                    set);
+  }
+
+  private void simpleNameOutputPortAssertHelperAssert(Attribute<?> attributeObj,
+                                                      Properties props,
+                                                      Object val,
+                                                      boolean set)
+  {
+    SimpleTestApplicationWithName app = new SimpleTestApplicationWithName();
+
+    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
+    dagBuilder.addFromProperties(props, null);
+
+    String appPath = app.getClass().getName().replace(".", "/") + ".class";
+
+    LogicalPlan dag = new LogicalPlan();
+    dagBuilder.prepareDAG(dag, app, appPath);
+
+    simpleNameOutputPortAssertHelper(attributeObj,
+                                     dag,
+                                     "operator1",
+                                     val,
+                                     set);
+
+    simpleNameOutputPortAssertHelper(attributeObj,
+                                     dag,
+                                     "operator2",
+                                     val,
+                                     set);
+
+    simpleNameOutputPortAssertHelper(attributeObj,
+                                     dag,
+                                     "operator3",
+                                     val,
+                                     set);
+  }
+
+  private void simpleNamePortAssertHelper(Attribute<?> attributeObj,
+                                          LogicalPlan dag,
+                                          String operatorName,
+                                          Object queueCapacity,
+                                          boolean set)
+  {
+    simpleNameInputPortAssertHelper(attributeObj,
+                                    dag,
+                                    operatorName,
+                                    queueCapacity,
+                                    set);
+
+    simpleNameOutputPortAssertHelper(attributeObj,
+                                     dag,
+                                     operatorName,
+                                     queueCapacity,
+                                     set);
+  }
+
+  private void simpleNameInputPortAssertHelper(Attribute<?> attributeObj,
+                                               LogicalPlan dag,
+                                               String operatorName,
+                                               Object queueCapacity,
+                                               boolean set)
+  {
+    OperatorMeta operatorMeta = dag.getOperatorMeta(operatorName);
+
+    for (InputPortMeta inputPortMeta: operatorMeta.getInputStreams().keySet()) {
+      if (set) {
+        Assert.assertEquals(queueCapacity, inputPortMeta.getValue(attributeObj));
+      } else {
+        Assert.assertNotEquals(queueCapacity, inputPortMeta.getValue(attributeObj));
+      }
+    }
+  }
+
+  private void simpleNameOutputPortAssertHelper(Attribute<?> attributeObj,
+                                                LogicalPlan dag,
+                                                String operatorName,
+                                                Object queueCapacity,
+                                                boolean set)
+  {
+    OperatorMeta operatorMeta = dag.getOperatorMeta(operatorName);
+
+    for (OutputPortMeta outputPortMeta: operatorMeta.getOutputStreams().keySet()) {
+      if (set) {
+        Assert.assertEquals(queueCapacity, outputPortMeta.getValue(attributeObj));
+      } else {
+        Assert.assertNotEquals(queueCapacity, outputPortMeta.getValue(attributeObj));
+      }
+    }
+  }
+
+  /* Helpers for building properties */
+  private Properties propertiesBuilder(String attributeName,
+                                       String root,
+                                       String contextClass,
+                                       Object val)
+  {
+    boolean simpleName = contextClass == null;
+
+    if (!simpleName) {
+      attributeName = contextClass
+                      + LogicalPlanConfiguration.KEY_SEPARATOR
+                      + attributeName;
+    }
+
+    Properties props = new Properties();
+
+    String propName = root
+                      + StramElement.ATTR.getValue()
+                      + LogicalPlanConfiguration.KEY_SEPARATOR
+                      + attributeName;
+
+    LOG.debug("adding prop {} with value {}", propName, val.toString());
+
+    props.put(propName,
+              val.toString());
+
+    return props;
+  }
+
+  private Properties propertiesBuilderOperator(String attributeName,
+                                               String root,
+                                               boolean simpleName,
+                                               Object val,
+                                               boolean addOperator)
+  {
+    String contextClass = simpleName ? null : OperatorContext.class.getCanonicalName();
+
+    if (addOperator) {
+      root += "operator"
+              + LogicalPlanConfiguration.KEY_SEPARATOR
+              + "*"
+              + LogicalPlanConfiguration.KEY_SEPARATOR;
+    }
+
+    return propertiesBuilder(attributeName,
+                             root,
+                             contextClass,
+                             val);
+  }
+
+  private Properties propertiesBuilderPort(String attributeName,
+                                           String root,
+                                           boolean simpleName,
+                                           Object val,
+                                           boolean addPort)
+  {
+    String contextClass = simpleName ? null : PortContext.class.getCanonicalName();
+
+    if (addPort) {
+      root += "port"
+              + LogicalPlanConfiguration.KEY_SEPARATOR
+              + "*"
+              + LogicalPlanConfiguration.KEY_SEPARATOR;
+    }
+
+    return propertiesBuilder(attributeName,
+                             root,
+                             contextClass,
+                             val);
+  }
+
+  private Properties propertiesBuilderInputPort(String attributeName,
+                                                String root,
+                                                boolean simpleName,
+                                                Object val)
+  {
+    String contextClass = simpleName ? null: PortContext.class.getCanonicalName();
+
+    root += "inputport" +
+            LogicalPlanConfiguration.KEY_SEPARATOR +
+            "*" +
+            LogicalPlanConfiguration.KEY_SEPARATOR;
+
+    return propertiesBuilder(attributeName,
+                             root,
+                             contextClass,
+                             val);
+  }
+
+  private Properties propertiesBuilderOutputPort(String attributeName,
+                                                 String root,
+                                                 boolean simpleName,
+                                                 Object val)
+  {
+    String contextClass = simpleName ? null: PortContext.class.getCanonicalName();
+
+    root += "outputport" +
+            LogicalPlanConfiguration.KEY_SEPARATOR +
+            "*" +
+            LogicalPlanConfiguration.KEY_SEPARATOR;
+
+    return propertiesBuilder(attributeName,
+                             root,
+                             contextClass,
+                             val);
+  }
+
   private static final Logger logger = LoggerFactory.getLogger(LogicalPlanConfigurationTest.class);
 
   public static class TestApplication implements StreamingApplication {
@@ -818,11 +1707,11 @@ public class LogicalPlanConfigurationTest {
       }
     }
   }
-  
+
   public static class TestStatsListener implements StatsListener{
-    
+
     private int intProp;
-    
+
     public TestStatsListener()
     {
     }
@@ -866,11 +1755,34 @@ public class LogicalPlanConfigurationTest {
         return false;
       return true;
     }
-
   }
 
   public static class TestSchema
   {
   }
+
+  public static class SimpleTestApplication implements StreamingApplication
+  {
+    public final GenericTestOperator gt1 = new GenericTestOperator();
+    public final GenericTestOperator gt2 = new GenericTestOperator();
+    public final GenericTestOperator gt3 = new GenericTestOperator();
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      dag.addOperator("operator1", gt1);
+      dag.addOperator("operator2", gt2);
+      dag.addOperator("operator3", gt3);
+      dag.addStream("s1", gt1.outport1, gt2.inport1);
+      dag.addStream("s2", gt2.outport1, gt3.inport1, gt3.inport2);
+    }
+  };
+
+  @ApplicationAnnotation(name="SimpleTestApp")
+  public static class SimpleTestApplicationWithName extends SimpleTestApplication
+  {
+  };
+
+  private static final Logger LOG = LoggerFactory.getLogger(LogicalPlanConfigurationTest.class);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/434a7170/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java
index ac05bed..78173d8 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java
@@ -19,6 +19,7 @@ import com.datatorrent.common.util.BaseOperator;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.Serializable;
+
 import java.util.*;
 
 import javax.validation.*;
@@ -50,6 +51,7 @@ import com.datatorrent.stram.engine.TestGeneratorInputOperator;
 import com.datatorrent.stram.engine.TestNonOptionalOutportInputOperator;
 import com.datatorrent.stram.engine.TestOutputOperator;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
 import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/434a7170/engine/src/test/java/com/datatorrent/stram/plan/logical/MockStorageAgent.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/MockStorageAgent.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/MockStorageAgent.java
new file mode 100644
index 0000000..3975e02
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/MockStorageAgent.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.stram.plan.logical;
+
+import com.datatorrent.api.StorageAgent;
+import java.io.IOException;
+
+public class MockStorageAgent implements StorageAgent
+{
+  public MockStorageAgent()
+  {
+  }
+
+  @Override
+  public void save(Object object, int operatorId, long windowId) throws IOException
+  {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Object load(int operatorId, long windowId) throws IOException
+  {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public void delete(int operatorId, long windowId) throws IOException
+  {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public long[] getWindowIds(int operatorId) throws IOException
+  {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String toString()
+  {
+    return MockStorageAgent.class.getCanonicalName();
+  }
+
+  @Override
+  public boolean equals(Object obj)
+  {
+    if(obj == null) {
+      return false;
+    }
+
+    return obj instanceof MockStorageAgent;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/434a7170/engine/src/test/resources/testTopology.json
----------------------------------------------------------------------
diff --git a/engine/src/test/resources/testTopology.json b/engine/src/test/resources/testTopology.json
index 1ea9756..62c5262 100644
--- a/engine/src/test/resources/testTopology.json
+++ b/engine/src/test/resources/testTopology.json
@@ -13,7 +13,7 @@
         "STATS_LISTENERS" : {
           "java.util.ArrayList" : [
             {
-              "com.datatorrent.stram.plan.LogicalPlanConfigurationTest$TestStatsListener" : {
+              "com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest$TestStatsListener" : {
                 "intProp" : 222
               }
             }
@@ -38,7 +38,7 @@
         "com.datatorrent.stram.engine.GenericTestOperator":{
           "myStringProperty": "myStringPropertyValue"
         }
-        
+
       }
     },
     {


Mime
View raw message