apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [11/50] incubator-apex-core git commit: APEXCORE-105 Introduce module meta Inject properties through xml file on modules.
Date Fri, 29 Jan 2016 07:21:30 GMT
APEXCORE-105 Introduce module meta
Inject properties through xml file on modules.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b0360d45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b0360d45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b0360d45

Branch: refs/heads/master
Commit: b0360d45a361e462124db9c3000977987ca830e6
Parents: 2f1e1df
Author: Tushar R. Gosavi <tushar@apache.org>
Authored: Tue Oct 6 13:48:53 2015 +0530
Committer: Tushar R. Gosavi <tushar@apache.org>
Committed: Tue Dec 22 01:16:51 2015 +0530

----------------------------------------------------------------------
 api/src/main/java/com/datatorrent/api/DAG.java  |   2 +
 .../stram/codec/LogicalPlanSerializer.java      |   2 +-
 .../stram/plan/logical/LogicalPlan.java         | 120 ++++++++++-
 .../plan/logical/LogicalPlanConfiguration.java  |  99 +++++++--
 .../stram/webapp/StramWebServices.java          |   2 +-
 .../logical/module/TestModuleProperties.java    |  58 +++++
 .../stram/plan/logical/module/TestModules.java  | 216 +++++++++++++++++++
 7 files changed, 472 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b0360d45/api/src/main/java/com/datatorrent/api/DAG.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java
index abe2954..1dce402 100644
--- a/api/src/main/java/com/datatorrent/api/DAG.java
+++ b/api/src/main/java/com/datatorrent/api/DAG.java
@@ -164,6 +164,8 @@ public interface DAG extends DAGContext, Serializable
   {
     String getName();
 
+    Module getModule();
+
     InputPortMeta getMeta(Operator.InputPort<?> port);
 
     OutputPortMeta getMeta(Operator.OutputPort<?> port);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b0360d45/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
index 9e5ac04..90dd2b5 100644
--- a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
+++ b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
@@ -212,7 +212,7 @@ public class LogicalPlanSerializer extends JsonSerializer<LogicalPlan>
       String operatorKey = LogicalPlanConfiguration.OPERATOR_PREFIX + operatorMeta.getName();
       Operator operator = operatorMeta.getOperator();
       props.setProperty(operatorKey + "." + LogicalPlanConfiguration.OPERATOR_CLASSNAME,
operator.getClass().getName());
-      BeanMap operatorProperties = LogicalPlanConfiguration.getOperatorProperties(operator);
+      BeanMap operatorProperties = LogicalPlanConfiguration.getObjectProperties(operator);
       @SuppressWarnings("rawtypes")
       Iterator entryIterator = operatorProperties.entryIterator();
       while (entryIterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b0360d45/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index cca45d8..53e81bc 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import com.datatorrent.api.*;
@@ -48,7 +49,6 @@ import com.datatorrent.api.Operator.Unifier;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-
 import com.datatorrent.common.experimental.AppData;
 import com.datatorrent.common.metric.MetricsAggregator;
 import com.datatorrent.common.metric.SingleMetricAggregator;
@@ -1067,7 +1067,7 @@ public class LogicalPlan implements Serializable, DAG
   public <T extends Operator> T addOperator(String name, T operator)
   {
     if (operators.containsKey(name)) {
-      if (operators.get(name) == (Object)operator) {
+      if (operators.get(name).operator == operator) {
         return operator;
       }
       throw new IllegalArgumentException("duplicate operator id: " + operators.get(name));
@@ -1079,16 +1079,109 @@ public class LogicalPlan implements Serializable, DAG
     return operator;
   }
 
-  @Override
-  public <T extends Module> T addModule(String name, Class<T> moduleClass)
+  public final class ModuleMeta implements DAG.ModuleMeta, Serializable
   {
-    throw new UnsupportedOperationException("Modules are not supported");
+    private final LinkedHashMap<InputPortMeta, StreamMeta> inputStreams = new LinkedHashMap<>();
+    private final LinkedHashMap<OutputPortMeta, StreamMeta> outputStreams = new LinkedHashMap<>();
+    private final Attribute.AttributeMap attributes;
+    @SuppressWarnings("unused")
+    private final int id;
+    @NotNull
+    private final String name;
+    private transient Integer nindex; // for cycle detection
+    private transient Integer lowlink; // for cycle detection
+    private transient Module module;
+
+    public ModuleMeta(String name, Module module)
+    {
+      this(name, module, new DefaultAttributeMap());
+    }
+
+    public ModuleMeta(String name, Module module, DefaultAttributeMap attributeMap)
+    {
+      LOG.debug("Initializing {} as {}", name, module.getClass().getName());
+      this.name = name;
+      this.module = module;
+      this.id = logicalOperatorSequencer.decrementAndGet();
+      this.attributes = attributeMap;
+    }
+
+    @Override
+    public String getName()
+    {
+      return name;
+    }
+
+    @Override
+    public Module getModule()
+    {
+      return module;
+    }
+
+    @Override
+    public DAG.InputPortMeta getMeta(InputPort<?> port)
+    {
+      return null;
+    }
+
+    @Override
+    public DAG.OutputPortMeta getMeta(OutputPort<?> port)
+    {
+      return null;
+    }
+
+    @Override
+    public Attribute.AttributeMap getAttributes()
+    {
+      return null;
+    }
+
+    @Override
+    public <T> T getValue(Attribute<T> key)
+    {
+      return null;
+    }
+
+    @Override
+    public void setCounters(Object counters)
+    {
+
+    }
+
+    @Override
+    public void sendMetrics(Collection<String> metricNames)
+    {
+
+    }
   }
 
+  public transient Map<String, ModuleMeta> modules = Maps.newHashMap();
+
   @Override
   public <T extends Module> T addModule(String name, T module)
   {
-    throw new UnsupportedOperationException("Modules are not supported");
+    if (modules.containsKey(name)) {
+      if (modules.get(name).module == module) {
+        return module;
+      }
+      throw new IllegalArgumentException("duplicate module is: " + modules.get(name));
+    }
+    ModuleMeta meta = new ModuleMeta(name, module);
+    modules.put(name, meta);
+    return module;
+  }
+
+  @Override
+  public <T extends Module> T addModule(String name, Class<T> clazz)
+  {
+    T instance;
+    try {
+      instance = clazz.newInstance();
+    } catch (Exception ex) {
+      throw new IllegalArgumentException(ex);
+    }
+    addModule(name, instance);
+    return instance;
   }
 
   public void removeOperator(Operator operator)
@@ -1231,6 +1324,10 @@ public class LogicalPlan implements Serializable, DAG
     return Collections.unmodifiableCollection(this.operators.values());
   }
 
+  public Collection<ModuleMeta> getAllModules() {
+    return Collections.unmodifiableCollection(this.modules.values());
+  }
+
   public Collection<StreamMeta> getAllStreams()
   {
     return Collections.unmodifiableCollection(this.streams.values());
@@ -1242,10 +1339,9 @@ public class LogicalPlan implements Serializable, DAG
     return this.operators.get(operatorName);
   }
 
-  @Override
   public ModuleMeta getModuleMeta(String moduleName)
   {
-    throw new UnsupportedOperationException("Modules are not supported");
+    return null;
   }
 
   @Override
@@ -1260,10 +1356,14 @@ public class LogicalPlan implements Serializable, DAG
     throw new IllegalArgumentException("Operator not associated with the DAG: " + operator);
   }
 
-  @Override
   public ModuleMeta getMeta(Module module)
   {
-    throw new UnsupportedOperationException("Modules are not supported");
+    for (ModuleMeta m : getAllModules()) {
+      if (m.module == module) {
+        return m;
+      }
+    }
+    throw new IllegalArgumentException("Module not associated with the DAG: " + module);
   }
 
   public int getMaxContainerCount()

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b0360d45/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index 60bbdbe..9bbe85c 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -18,7 +18,6 @@
  */
 package com.datatorrent.stram.plan.logical;
 
-
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -32,7 +31,6 @@ import java.lang.reflect.Type;
 import java.util.*;
 import java.util.Map.Entry;
 
-
 import javax.validation.ValidationException;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -47,6 +45,7 @@ import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.commons.beanutils.BeanMap;
 import org.apache.commons.beanutils.BeanUtils;
 import org.apache.commons.collections.CollectionUtils;
@@ -61,10 +60,10 @@ import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
-
 import com.datatorrent.stram.StramUtils;
 import com.datatorrent.stram.client.StramClientUtils;
 import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlan.ModuleMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
@@ -2114,6 +2113,10 @@ public class LogicalPlanConfiguration {
     if (dag.getAttributes().get(Context.DAGContext.APPLICATION_NAME) == null) {
       dag.setAttribute(Context.DAGContext.APPLICATION_NAME, appName);
     }
+
+    // Expand the modules within the dag recursively
+    setModuleProperties(dag, appName);
+
     // inject external operator configuration
     setOperatorConfiguration(dag, appConfs, appName);
     setStreamConfiguration(dag, appConfs, appName);
@@ -2138,7 +2141,7 @@ public class LogicalPlanConfiguration {
   public Map<String, String> getProperties(OperatorMeta ow, String appName) {
     List<AppConf> appConfs = stramConf.getMatchingChildConf(appName, StramElement.APPLICATION);
     List<OperatorConf> opConfs = getMatchingChildConf(appConfs, ow.getName(), StramElement.OPERATOR);
-    return getProperties(ow, opConfs, appName);
+    return getProperties(getPropertyArgs(ow), opConfs, appName);
   }
 
   private Map<String,String> getApplicationProperties(List<AppConf> appConfs){
@@ -2153,17 +2156,17 @@ public class LogicalPlanConfiguration {
   /**
    * Get the configuration opProps for the given operator.
    * These can be operator specific settings or settings from matching templates.
-   * @param ow
+   * @param pa
    * @param opConfs
    * @param appName
    */
-  private Map<String, String> getProperties(OperatorMeta ow, List<OperatorConf>
opConfs, String appName)
+  private Map<String, String> getProperties(PropertyArgs pa, List<OperatorConf>
opConfs, String appName)
   {
     Map<String, String> opProps = Maps.newHashMap();
     Map<String, TemplateConf> templates = stramConf.getChildren(StramElement.TEMPLATE);
     // list of all templates that match operator, ordered by priority
     if (!templates.isEmpty()) {
-      TreeMap<Integer, TemplateConf> matchingTemplates = getMatchingTemplates(ow, appName,
templates);
+      TreeMap<Integer, TemplateConf> matchingTemplates = getMatchingTemplates(pa, appName,
templates);
       if (matchingTemplates != null && !matchingTemplates.isEmpty()) {
         // combined map of prioritized template settings
         for (TemplateConf t : matchingTemplates.descendingMap().values()) {
@@ -2197,23 +2200,46 @@ public class LogicalPlanConfiguration {
     return refTemplates;
   }
 
+  private static class PropertyArgs
+  {
+    String name;
+    String className;
+
+    public PropertyArgs(String name, String className)
+    {
+      this.name = name;
+      this.className = className;
+    }
+  }
+
+  private PropertyArgs getPropertyArgs(OperatorMeta om)
+  {
+    return new PropertyArgs(om.getName(), om.getOperator().getClass().getName());
+  }
+
+  private PropertyArgs getPropertyArgs(ModuleMeta mm)
+  {
+    return new PropertyArgs(mm.getName(), mm.getModule().getClass().getName());
+  }
+
   /**
    * Produce the collections of templates that apply for the given id.
-   * @param ow
+   * @param pa
    * @param appName
    * @param templates
    * @return TreeMap<Integer, TemplateConf>
    */
-  private TreeMap<Integer, TemplateConf> getMatchingTemplates(OperatorMeta ow, String
appName, Map<String, TemplateConf> templates) {
+  private TreeMap<Integer, TemplateConf> getMatchingTemplates(PropertyArgs pa, String
appName, Map<String, TemplateConf> templates)
+  {
     TreeMap<Integer, TemplateConf> tm = Maps.newTreeMap();
     for (TemplateConf t : templates.values()) {
-      if ((t.idRegExp != null && ow.getName().matches(t.idRegExp))) {
+      if ((t.idRegExp != null && pa.name.matches(t.idRegExp))) {
         tm.put(1, t);
       } else if (appName != null && t.appNameRegExp != null
           && appName.matches(t.appNameRegExp)) {
         tm.put(2, t);
       } else if (t.classNameRegExp != null
-          && ow.getOperator().getClass().getName().matches(t.classNameRegExp)) {
+          && pa.className.matches(t.classNameRegExp)) {
         tm.put(3, t);
       }
     }
@@ -2238,6 +2264,26 @@ public class LogicalPlanConfiguration {
     }
   }
 
+  /**
+   * Generic helper function to inject properties on the object.
+   *
+   * @param obj
+   * @param properties
+   * @param <T>
+   * @return
+   */
+  public static <T> T setObjectProperties(T obj, Map<String, String> properties)
+  {
+    try {
+      BeanUtils.populate(obj, properties);
+      return obj;
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException("Error setting operator properties", e);
+    } catch (InvocationTargetException e) {
+      throw new IllegalArgumentException("Error setting operator properties", e);
+    }
+  }
+
   public static StreamingApplication setApplicationProperties(StreamingApplication application,
Map<String, String> properties)
   {
     try {
@@ -2249,9 +2295,9 @@ public class LogicalPlanConfiguration {
     }
   }
 
-  public static BeanMap getOperatorProperties(Operator operator)
+  public static BeanMap getObjectProperties(Object obj)
   {
-    return new BeanMap(operator);
+    return new BeanMap(obj);
   }
 
   /**
@@ -2266,12 +2312,26 @@ public class LogicalPlanConfiguration {
     List<AppConf> appConfs = stramConf.getMatchingChildConf(applicationName, StramElement.APPLICATION);
     for (OperatorMeta ow : dag.getAllOperators()) {
       List<OperatorConf> opConfs = getMatchingChildConf(appConfs, ow.getName(), StramElement.OPERATOR);
-      Map<String, String> opProps = getProperties(ow, opConfs, applicationName);
+      Map<String, String> opProps = getProperties(getPropertyArgs(ow), opConfs, applicationName);
       setOperatorProperties(ow.getOperator(), opProps);
     }
   }
 
   /**
+   * Set any properties from configuration on the modules in the DAG. This
+   * method may throw unchecked exception if the configuration contains
+   * properties that are invalid for a module.
+   *
+   * @param dag
+   * @param applicationName
+   */
+  public void setModuleProperties(LogicalPlan dag, String applicationName)
+  {
+    List<AppConf> appConfs = stramConf.getMatchingChildConf(applicationName, StramElement.APPLICATION);
+    setModuleConfiguration(dag, appConfs, applicationName);
+  }
+
+  /**
    * Set the application configuration.
    * @param dag
    * @param appName
@@ -2298,7 +2358,7 @@ public class LogicalPlanConfiguration {
       // Set the operator attributes
       setAttributes(opConfs, ow.getAttributes());
       // Set the operator opProps
-      Map<String, String> opProps = getProperties(ow, opConfs, appName);
+      Map<String, String> opProps = getProperties(getPropertyArgs(ow), opConfs, appName);
       setOperatorProperties(ow.getOperator(), opProps);
 
       // Set the port attributes
@@ -2327,6 +2387,15 @@ public class LogicalPlanConfiguration {
     }
   }
 
+  private void setModuleConfiguration(final LogicalPlan dag, List<AppConf> appConfs,
String appName)
+  {
+    for (final ModuleMeta mw : dag.getAllModules()) {
+      List<OperatorConf> opConfs = getMatchingChildConf(appConfs, mw.getName(), StramElement.OPERATOR);
+      Map<String, String> opProps = getProperties(getPropertyArgs(mw), opConfs, appName);
+      setObjectProperties(mw.getModule(), opProps);
+    }
+  }
+
   private void setStreamConfiguration(LogicalPlan dag, List<AppConf> appConfs, String
appAlias) {
     for (StreamMeta sm : dag.getAllStreams()) {
       List<StreamConf> smConfs = getMatchingChildConf(appConfs, sm.getName(), StramElement.STREAM);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b0360d45/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
index de085cd..6fdba00 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
@@ -715,7 +715,7 @@ public class StramWebServices
       throw new NotFoundException();
     }
 
-    BeanMap operatorProperties = LogicalPlanConfiguration.getOperatorProperties(logicalOperator.getOperator());
+    BeanMap operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalOperator.getOperator());
     Map<String, Object> m = new HashMap<String, Object>();
     @SuppressWarnings("rawtypes")
     Iterator entryIterator = operatorProperties.entryIterator();

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b0360d45/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleProperties.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleProperties.java
b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleProperties.java
new file mode 100644
index 0000000..7951e26
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleProperties.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.plan.logical.module;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
+
+public class TestModuleProperties
+{
+  @Test
+  public void testModuleProperties()
+  {
+    Configuration conf = new Configuration(false);
+    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.myStringProperty", "myStringPropertyValue");
+    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.stringArrayField", "a,b,c");
+    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty.key1", "key1Val");
+    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty(key1.dot)", "key1dotVal");
+    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty(key2.dot)", "key2dotVal");
+
+    LogicalPlan dag = new LogicalPlan();
+    TestModules.GenericModule o1 = dag.addModule("o1", new TestModules.GenericModule());
+    TestModules.ValidationTestModule o2 = dag.addModule("o2", new TestModules.ValidationTestModule());
+
+    LogicalPlanConfiguration pb = new LogicalPlanConfiguration(conf);
+
+    pb.setModuleProperties(dag, "testSetOperatorProperties");
+    Assert.assertEquals("o1.myStringProperty", "myStringPropertyValue", o1.getMyStringProperty());
+    Assert.assertArrayEquals("o2.stringArrayField", new String[]{"a", "b", "c"}, o2.getStringArrayField());
+
+    Assert.assertEquals("o2.mapProperty.key1", "key1Val", o2.getMapProperty().get("key1"));
+    Assert.assertEquals("o2.mapProperty(key1.dot)", "key1dotVal", o2.getMapProperty().get("key1.dot"));
+    Assert.assertEquals("o2.mapProperty(key2.dot)", "key2dotVal", o2.getMapProperty().get("key2.dot"));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b0360d45/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java
b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java
new file mode 100644
index 0000000..8fad613
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.plan.logical.module;
+
+import java.util.Map;
+
+import javax.validation.Valid;
+import javax.validation.constraints.AssertTrue;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Module;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.stram.engine.GenericOperatorProperty;
+
+public class TestModules
+{
+
+  public static class GenericModule implements Module
+  {
+    private static final Logger LOG = LoggerFactory.getLogger(TestModules.class);
+
+    public volatile Object inport1Tuple = null;
+
+    @OutputPortFieldAnnotation(optional = true)
+    public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
+
+    @OutputPortFieldAnnotation(optional = true)
+    public final transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<Object>();
+
+    private String emitFormat;
+
+    public boolean booleanProperty;
+
+    private String myStringProperty;
+
+    private transient GenericOperatorProperty genericOperatorProperty = new GenericOperatorProperty("test");
+
+    public String getMyStringProperty()
+    {
+      return myStringProperty;
+    }
+
+    public void setMyStringProperty(String myStringProperty)
+    {
+      this.myStringProperty = myStringProperty;
+    }
+
+    public boolean isBooleanProperty()
+    {
+      return booleanProperty;
+    }
+
+    public void setBooleanProperty(boolean booleanProperty)
+    {
+      this.booleanProperty = booleanProperty;
+    }
+
+    public String propertySetterOnly;
+
+    /**
+     * setter w/o getter defined
+     *
+     * @param v
+     */
+    public void setStringPropertySetterOnly(String v)
+    {
+      this.propertySetterOnly = v;
+    }
+
+    public String getEmitFormat()
+    {
+      return emitFormat;
+    }
+
+    public void setEmitFormat(String emitFormat)
+    {
+      this.emitFormat = emitFormat;
+    }
+
+    public GenericOperatorProperty getGenericOperatorProperty()
+    {
+      return genericOperatorProperty;
+    }
+
+    public void setGenericOperatorProperty(GenericOperatorProperty genericOperatorProperty)
+    {
+      this.genericOperatorProperty = genericOperatorProperty;
+    }
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      LOG.debug("populateDAG of module called");
+    }
+  }
+
+  public static class ValidationTestModule implements Module
+  {
+    @NotNull
+    @Pattern(regexp = ".*malhar.*", message = "Value has to contain 'malhar'!")
+    private String stringField1;
+
+    @Min(2)
+    private int intField1;
+
+    @AssertTrue(message = "stringField1 should end with intField1")
+    private boolean isValidConfiguration()
+    {
+      return stringField1.endsWith(String.valueOf(intField1));
+    }
+
+    private String getterProperty2 = "";
+
+    @NotNull
+    public String getProperty2()
+    {
+      return getterProperty2;
+    }
+
+    public void setProperty2(String s)
+    {
+      // annotations need to be on the getter
+      getterProperty2 = s;
+    }
+
+    private String[] stringArrayField;
+
+    public String[] getStringArrayField()
+    {
+      return stringArrayField;
+    }
+
+    public void setStringArrayField(String[] stringArrayField)
+    {
+      this.stringArrayField = stringArrayField;
+    }
+
+    public class Nested
+    {
+      @NotNull
+      private String property = "";
+
+      public String getProperty()
+      {
+        return property;
+      }
+
+      public void setProperty(String property)
+      {
+        this.property = property;
+      }
+
+    }
+
+    @Valid
+    private final Nested nestedBean = new Nested();
+
+    private String stringProperty2;
+
+    public String getStringProperty2()
+    {
+      return stringProperty2;
+    }
+
+    public void setStringProperty2(String stringProperty2)
+    {
+      this.stringProperty2 = stringProperty2;
+    }
+
+    private Map<String, String> mapProperty = Maps.newHashMap();
+
+    public Map<String, String> getMapProperty()
+    {
+      return mapProperty;
+    }
+
+    public void setMapProperty(Map<String, String> mapProperty)
+    {
+      this.mapProperty = mapProperty;
+    }
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+
+    }
+  }
+
+}


Mime
View raw message