apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [13/50] incubator-apex-core git commit: APEXCORE-194 Added support for proxy ports Added test cases.
Date Fri, 29 Jan 2016 07:21:32 GMT
APEXCORE-194 Added support for proxy ports
Added test cases.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/c1314eaf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/c1314eaf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/c1314eaf

Branch: refs/heads/master
Commit: c1314eafaac239b420d085a4584d5c5acaf3e69b
Parents: 14a09bb
Author: bhupeshchawda <bhupeshchawda@gmail.com>
Authored: Tue Oct 6 12:34:24 2015 +0530
Committer: Tushar R. Gosavi <tushar@apache.org>
Committed: Tue Dec 22 02:04:18 2015 +0530

----------------------------------------------------------------------
 .../main/java/com/datatorrent/api/Module.java   | 120 ++++
 .../stram/plan/logical/LogicalPlan.java         |  51 +-
 .../plan/logical/LogicalPlanConfiguration.java  |   1 +
 .../plan/logical/module/ModuleAppTest.java      | 168 ++++++
 .../logical/module/TestModuleExpansion.java     | 552 +++++++++++++++++++
 5 files changed, 888 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/c1314eaf/api/src/main/java/com/datatorrent/api/Module.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Module.java b/api/src/main/java/com/datatorrent/api/Module.java
index 1220fc1..67682e7 100644
--- a/api/src/main/java/com/datatorrent/api/Module.java
+++ b/api/src/main/java/com/datatorrent/api/Module.java
@@ -21,8 +21,128 @@ package com.datatorrent.api;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.Operator.OutputPort;
+import com.datatorrent.api.Operator.Unifier;
+
+/**
+ * A Module is a component which can be added to the DAG similar to the operator,
+ * using addModule API. The module should implement populateDAG method, which
+ * will be called by the platform, and DAG populated by the module will be
+ * replace in place of the module.
+ *
+ */
 @InterfaceStability.Evolving
 public interface Module
 {
   void populateDAG(DAG dag, Configuration conf);
+
+  /**
+   * These ports allow platform to short circuit module port to the operator port. i.e When
a module is expanded, it can
+   * specify  which operator's port is used to replaced the module port in the final DAG.
+   *
+   * @param <T> data type accepted at the input port.
+   */
+  final class ProxyInputPort<T> implements InputPort<T>
+  {
+    InputPort<T> inputPort;
+
+    public void set(InputPort<T> port)
+    {
+      inputPort = port;
+    }
+
+    public InputPort<T> get()
+    {
+      return inputPort;
+    }
+
+    @Override
+    public void setup(PortContext context)
+    {
+      if (inputPort != null) {
+        inputPort.setup(context);
+      }
+    }
+
+    @Override
+    public void teardown()
+    {
+      if (inputPort != null) {
+        inputPort.teardown();
+      }
+    }
+
+    @Override
+    public Sink<T> getSink()
+    {
+      return inputPort == null ? null : inputPort.getSink();
+    }
+
+    @Override
+    public void setConnected(boolean connected)
+    {
+      if (inputPort != null) {
+        inputPort.setConnected(connected);
+      }
+    }
+
+    @Override
+    public StreamCodec<T> getStreamCodec()
+    {
+      return inputPort == null ? null : inputPort.getStreamCodec();
+    }
+  }
+
+  /**
+   * Similar to ProxyInputPort, but on output side.
+   *
+   * @param <T> datatype emitted on the port.
+   */
+  final class ProxyOutputPort<T> implements OutputPort<T>
+  {
+    OutputPort<T> outputPort;
+
+    public void set(OutputPort<T> port)
+    {
+      outputPort = port;
+    }
+
+    public OutputPort<T> get()
+    {
+      return outputPort;
+    }
+
+    @Override
+    public void setup(PortContext context)
+    {
+      if (outputPort != null) {
+        outputPort.setup(context);
+      }
+    }
+
+    @Override
+    public void teardown()
+    {
+      if (outputPort != null) {
+        outputPort.teardown();
+      }
+    }
+
+    @Override
+    public void setSink(Sink<Object> s)
+    {
+      if (outputPort != null) {
+        outputPort.setSink(s);
+      }
+    }
+
+    @Override
+    public Unifier<T> getUnifier()
+    {
+      return outputPort == null ? null : outputPort.getUnifier();
+    }
+  }
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/c1314eaf/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 5a3e167..21039cc 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -37,12 +37,14 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Sets;
 
 import com.datatorrent.api.*;
 import com.datatorrent.api.Attribute.AttributeMap;
 import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
+import com.datatorrent.api.Module.ProxyInputPort;
+import com.datatorrent.api.Module.ProxyOutputPort;
 import com.datatorrent.api.Operator.InputPort;
 import com.datatorrent.api.Operator.OutputPort;
 import com.datatorrent.api.Operator.Unifier;
@@ -152,6 +154,7 @@ public class LogicalPlan implements Serializable, DAG
   private final Attribute.AttributeMap attributes = new DefaultAttributeMap();
   private transient int nodeIndex = 0; // used for cycle validation
   private transient Stack<OperatorMeta> stack = new Stack<OperatorMeta>(); //
used for cycle validation
+  private transient Map<String, ArrayListMultimap<OutputPort<?>, InputPort<?>>>
streamLinks = new HashMap<>();
 
   @Override
   public Attribute.AttributeMap getAttributes()
@@ -1197,6 +1200,7 @@ public class LogicalPlan implements Serializable, DAG
         subModuleMeta.setParent(this);
         subModuleMeta.flattenModule(dag, conf);
       }
+      dag.applyStreamLinks();
       parentDAG.addDAGToCurrentDAG(this);
     }
 
@@ -1300,13 +1304,52 @@ public class LogicalPlan implements Serializable, DAG
   public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T>
source, Operator.InputPort<? super T>... sinks)
   {
     StreamMeta s = addStream(id);
-    s.setSource(source);
-    for (Operator.InputPort<?> sink: sinks) {
-      s.addSink(sink);
+    id = s.id;
+    ArrayListMultimap<OutputPort<?>, InputPort<?>> streamMap = ArrayListMultimap.create();
+    if (!(source instanceof ProxyOutputPort)) {
+      s.setSource(source);
+    }
+    for (Operator.InputPort<?> sink : sinks) {
+      if (source instanceof ProxyOutputPort || sink instanceof ProxyInputPort) {
+        streamMap.put(source, sink);
+        streamLinks.put(id, streamMap);
+      } else {
+        if (s.getSource() == null) {
+          s.setSource(source);
+        }
+        s.addSink(sink);
+      }
     }
     return s;
   }
 
+  /**
+   * This will be called once the Logical Dag is expanded, and the proxy input and proxy
output ports are populated with
+   * the actual ports that they refer to This method adds sources and sinks for the StreamMeta
objects which were left
+   * empty in the addStream call.
+   */
+  public void applyStreamLinks()
+  {
+    for (String id : streamLinks.keySet()) {
+      StreamMeta s = getStream(id);
+      for (Map.Entry<Operator.OutputPort<?>, Operator.InputPort<?>> pair
: streamLinks.get(id).entries()) {
+        if (s.getSource() == null) {
+          Operator.OutputPort<?> outputPort = pair.getKey();
+          while (outputPort instanceof ProxyOutputPort) {
+            outputPort = ((ProxyOutputPort<?>)outputPort).get();
+          }
+          s.setSource(outputPort);
+        }
+
+        Operator.InputPort<?> inputPort = pair.getValue();
+        while (inputPort instanceof ProxyInputPort) {
+          inputPort = ((ProxyInputPort<?>)inputPort).get();
+        }
+        s.addSink(inputPort);
+      }
+    }
+  }
+
   @SuppressWarnings({ "unchecked", "rawtypes" })
   private void addDAGToCurrentDAG(ModuleMeta moduleMeta)
   {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/c1314eaf/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index 6dc4c0c..483576a 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -2128,6 +2128,7 @@ public class LogicalPlanConfiguration {
     for (ModuleMeta moduleMeta : dag.getAllModules()) {
       moduleMeta.flattenModule(dag, conf);
     }
+    dag.applyStreamLinks();
   }
 
   public static Properties readProperties(String filePath) throws IOException

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/c1314eaf/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
new file mode 100644
index 0000000..97c015e
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.plan.logical.module;
+
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Module;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
+
+/**
+ * Unit tests for testing Dag expansion with modules and proxy port substitution
+ */
+public class ModuleAppTest
+{
+
+  /*
+   * Input Operator - 1
+   */
+  static class DummyInputOperator extends BaseOperator implements InputOperator
+  {
+
+    Random r = new Random();
+    public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+
+    @Override
+    public void emitTuples()
+    {
+      output.emit(r.nextInt());
+    }
+  }
+
+  /*
+   * Input Operator - 1.1
+   */
+  static class DummyOperatorAfterInput extends BaseOperator
+  {
+
+    public transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>()
+    {
+      @Override
+      public void process(Integer tuple)
+      {
+        output.emit(tuple);
+      }
+    };
+    public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+  }
+
+  /*
+   * Operator - 2
+   */
+  static class DummyOperator extends BaseOperator
+  {
+    int prop;
+
+    public transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>()
+    {
+      @Override
+      public void process(Integer tuple)
+      {
+        LOG.debug(tuple.intValue() + " processed");
+        output.emit(tuple);
+      }
+    };
+    public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+  }
+
+  /*
+   * Output Operator - 3
+   */
+  static class DummyOutputOperator extends BaseOperator
+  {
+    int prop;
+
+    public transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>()
+    {
+      @Override
+      public void process(Integer tuple)
+      {
+        LOG.debug(tuple.intValue() + " processed");
+      }
+    };
+  }
+
+  /*
+   * Module Definition
+   */
+  static class TestModule implements Module
+  {
+
+    public transient ProxyInputPort<Integer> moduleInput = new Module.ProxyInputPort<Integer>();
+    public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<Integer>();
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      LOG.debug("Module - PopulateDAG");
+      DummyOperator dummyOperator = dag.addOperator("DummyOperator", new DummyOperator());
+      moduleInput.set(dummyOperator.input);
+      moduleOutput.set(dummyOperator.output);
+    }
+  }
+
+  static class Application implements StreamingApplication
+  {
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      LOG.debug("Application - PopulateDAG");
+      DummyInputOperator dummyInputOperator = dag.addOperator("DummyInputOperator", new DummyInputOperator());
+      DummyOperatorAfterInput dummyOperatorAfterInput = dag.addOperator("DummyOperatorAfterInput",
+          new DummyOperatorAfterInput());
+      Module m1 = dag.addModule("TestModule1", new TestModule());
+      Module m2 = dag.addModule("TestModule2", new TestModule());
+      DummyOutputOperator dummyOutputOperator = dag.addOperator("DummyOutputOperator", new
DummyOutputOperator());
+      dag.addStream("Operator To Operator", dummyInputOperator.output, dummyOperatorAfterInput.input);
+      dag.addStream("Operator To Module", dummyOperatorAfterInput.output, ((TestModule)m1).moduleInput);
+      dag.addStream("Module To Module", ((TestModule)m1).moduleOutput, ((TestModule)m2).moduleInput);
+      dag.addStream("Module To Operator", ((TestModule)m2).moduleOutput, dummyOutputOperator.input);
+    }
+  }
+
+  @Test
+  public void validateTestApplication()
+  {
+    Configuration conf = new Configuration(false);
+    LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
+    LogicalPlan dag = new LogicalPlan();
+    lpc.prepareDAG(dag, new Application(), "TestApp");
+
+    Assert.assertEquals(2, dag.getAllModules().size(), 2);
+    Assert.assertEquals(5, dag.getAllOperators().size());
+    Assert.assertEquals(4, dag.getAllStreams().size());
+    dag.validate();
+  }
+
+  private static Logger LOG = LoggerFactory.getLogger(ModuleAppTest.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/c1314eaf/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
new file mode 100644
index 0000000..5bfd8f1
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
@@ -0,0 +1,552 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.plan.logical.module;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Module;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
+
+public class TestModuleExpansion
+{
+  static class DummyInputOperator extends BaseOperator implements InputOperator
+  {
+    private int inputOperatorProp = 0;
+
+    Random r = new Random();
+    public transient DefaultOutputPort<Integer> out = new DefaultOutputPort<Integer>();
+
+    @Override
+    public void emitTuples()
+    {
+      out.emit(r.nextInt());
+    }
+
+    public int getInputOperatorProp()
+    {
+      return inputOperatorProp;
+    }
+
+    public void setInputOperatorProp(int inputOperatorProp)
+    {
+      this.inputOperatorProp = inputOperatorProp;
+    }
+  }
+
+  static class DummyOperator extends BaseOperator
+  {
+    private int operatorProp = 0;
+
+    @OutputPortFieldAnnotation(optional = true)
+    public final transient DefaultOutputPort<Integer> out1 = new DefaultOutputPort<>();
+
+    @OutputPortFieldAnnotation(optional = true)
+    public final transient DefaultOutputPort<Integer> out2 = new DefaultOutputPort<>();
+
+    @InputPortFieldAnnotation(optional = true)
+    public final transient DefaultInputPort<Integer> in = new DefaultInputPort<Integer>()
+    {
+      @Override
+      public void process(Integer tuple)
+      {
+        out1.emit(tuple);
+        out2.emit(tuple);
+      }
+    };
+
+    public int getOperatorProp()
+    {
+      return operatorProp;
+    }
+
+    public void setOperatorProp(int operatorProp)
+    {
+      this.operatorProp = operatorProp;
+    }
+  }
+
+  static class Level1Module implements Module
+  {
+    private int level1ModuleProp = 0;
+
+    @InputPortFieldAnnotation(optional = true)
+    public final transient ProxyInputPort<Integer> mIn = new ProxyInputPort<>();
+    @OutputPortFieldAnnotation(optional = true)
+    public final transient ProxyOutputPort<Integer> mOut = new ProxyOutputPort<>();
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      DummyOperator o1 = dag.addOperator("O1", new DummyOperator());
+      o1.setOperatorProp(level1ModuleProp);
+      mIn.set(o1.in);
+      mOut.set(o1.out1);
+    }
+
+    public int getLevel1ModuleProp()
+    {
+      return level1ModuleProp;
+    }
+
+    public void setLevel1ModuleProp(int level1ModuleProp)
+    {
+      this.level1ModuleProp = level1ModuleProp;
+    }
+  }
+
+  static class Level2ModuleA implements Module
+  {
+    private int level2ModuleAProp1 = 0;
+    private int level2ModuleAProp2 = 0;
+    private int level2ModuleAProp3 = 0;
+
+    @InputPortFieldAnnotation(optional = true)
+    public final transient ProxyInputPort<Integer> mIn = new ProxyInputPort<>();
+
+    @OutputPortFieldAnnotation(optional = true)
+    public final transient ProxyOutputPort<Integer> mOut1 = new ProxyOutputPort<>();
+
+    @OutputPortFieldAnnotation(optional = true)
+    public final transient ProxyOutputPort<Integer> mOut2 = new ProxyOutputPort<>();
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      Level1Module m1 = dag.addModule("M1", new Level1Module());
+      m1.setLevel1ModuleProp(level2ModuleAProp1);
+
+      Level1Module m2 = dag.addModule("M2", new Level1Module());
+      m2.setLevel1ModuleProp(level2ModuleAProp2);
+
+      DummyOperator o1 = dag.addOperator("O1", new DummyOperator());
+      o1.setOperatorProp(level2ModuleAProp3);
+
+      dag.addStream("M1_M2&O1", m1.mOut, m2.mIn, o1.in);
+
+      mIn.set(m1.mIn);
+      mOut1.set(m2.mOut);
+      mOut2.set(o1.out1);
+    }
+
+    public int getLevel2ModuleAProp1()
+    {
+      return level2ModuleAProp1;
+    }
+
+    public void setLevel2ModuleAProp1(int level2ModuleAProp1)
+    {
+      this.level2ModuleAProp1 = level2ModuleAProp1;
+    }
+
+    public int getLevel2ModuleAProp2()
+    {
+      return level2ModuleAProp2;
+    }
+
+    public void setLevel2ModuleAProp2(int level2ModuleAProp2)
+    {
+      this.level2ModuleAProp2 = level2ModuleAProp2;
+    }
+
+    public int getLevel2ModuleAProp3()
+    {
+      return level2ModuleAProp3;
+    }
+
+    public void setLevel2ModuleAProp3(int level2ModuleAProp3)
+    {
+      this.level2ModuleAProp3 = level2ModuleAProp3;
+    }
+  }
+
+  static class Level2ModuleB implements Module
+  {
+    private int level2ModuleBProp1 = 0;
+    private int level2ModuleBProp2 = 0;
+    private int level2ModuleBProp3 = 0;
+
+    @InputPortFieldAnnotation(optional = true)
+    public final transient ProxyInputPort<Integer> mIn = new ProxyInputPort<>();
+
+    @OutputPortFieldAnnotation(optional = true)
+    public final transient ProxyOutputPort<Integer> mOut1 = new ProxyOutputPort<>();
+
+    @OutputPortFieldAnnotation(optional = true)
+    public final transient ProxyOutputPort<Integer> mOut2 = new ProxyOutputPort<>();
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      DummyOperator o1 = dag.addOperator("O1", new DummyOperator());
+      o1.setOperatorProp(level2ModuleBProp1);
+
+      Level1Module m1 = dag.addModule("M1", new Level1Module());
+      m1.setLevel1ModuleProp(level2ModuleBProp2);
+
+      DummyOperator o2 = dag.addOperator("O2", new DummyOperator());
+      o2.setOperatorProp(level2ModuleBProp3);
+
+      dag.addStream("O1_M1", o1.out1, m1.mIn);
+      dag.addStream("O1_O2", o1.out2, o2.in);
+
+      mIn.set(o1.in);
+      mOut1.set(m1.mOut);
+      mOut2.set(o2.out1);
+    }
+
+    public int getLevel2ModuleBProp1()
+    {
+      return level2ModuleBProp1;
+    }
+
+    public void setLevel2ModuleBProp1(int level2ModuleBProp1)
+    {
+      this.level2ModuleBProp1 = level2ModuleBProp1;
+    }
+
+    public int getLevel2ModuleBProp2()
+    {
+      return level2ModuleBProp2;
+    }
+
+    public void setLevel2ModuleBProp2(int level2ModuleBProp2)
+    {
+      this.level2ModuleBProp2 = level2ModuleBProp2;
+    }
+
+    public int getLevel2ModuleBProp3()
+    {
+      return level2ModuleBProp3;
+    }
+
+    public void setLevel2ModuleBProp3(int level2ModuleBProp3)
+    {
+      this.level2ModuleBProp3 = level2ModuleBProp3;
+    }
+  }
+
+  static class Level3Module implements Module
+  {
+
+    public final transient ProxyInputPort<Integer> mIn = new ProxyInputPort<>();
+    public final transient ProxyOutputPort<Integer> mOut1 = new ProxyOutputPort<>();
+    public final transient ProxyOutputPort<Integer> mOut2 = new ProxyOutputPort<>();
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      DummyOperator op = dag.addOperator("O1", new DummyOperator());
+      Level2ModuleB m1 = dag.addModule("M1", new Level2ModuleB());
+      Level1Module m2 = dag.addModule("M2", new Level1Module());
+
+      dag.addStream("s1", op.out1, m1.mIn);
+      dag.addStream("s2", op.out2, m2.mIn);
+
+      mIn.set(op.in);
+      mOut1.set(m1.mOut1);
+      mOut2.set(m2.mOut);
+    }
+  }
+
+  static class NestedModuleApp implements StreamingApplication
+  {
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      DummyInputOperator o1 = dag.addOperator("O1", new DummyInputOperator());
+      o1.setInputOperatorProp(1);
+
+      DummyOperator o2 = dag.addOperator("O2", new DummyOperator());
+      o2.setOperatorProp(2);
+
+      Level2ModuleA ma = dag.addModule("Ma", new Level2ModuleA());
+      ma.setLevel2ModuleAProp1(11);
+      ma.setLevel2ModuleAProp2(12);
+      ma.setLevel2ModuleAProp3(13);
+
+      Level2ModuleB mb = dag.addModule("Mb", new Level2ModuleB());
+      mb.setLevel2ModuleBProp1(21);
+      mb.setLevel2ModuleBProp2(22);
+      mb.setLevel2ModuleBProp3(23);
+
+      Level2ModuleA mc = dag.addModule("Mc", new Level2ModuleA());
+      mc.setLevel2ModuleAProp1(31);
+      mc.setLevel2ModuleAProp2(32);
+      mc.setLevel2ModuleAProp3(33);
+
+      Level2ModuleB md = dag.addModule("Md", new Level2ModuleB());
+      md.setLevel2ModuleBProp1(41);
+      md.setLevel2ModuleBProp2(42);
+      md.setLevel2ModuleBProp3(43);
+
+      Level3Module me = dag.addModule("Me", new Level3Module());
+
+      dag.addStream("O1_O2", o1.out, o2.in, me.mIn);
+      dag.addStream("O2_Ma", o2.out1, ma.mIn);
+      dag.addStream("Ma_Mb", ma.mOut1, mb.mIn);
+      dag.addStream("Ma_Md", ma.mOut2, md.mIn);
+      dag.addStream("Mb_Mc", mb.mOut2, mc.mIn);
+    }
+  }
+
+  @Test
+  public void testModuleExtreme()
+  {
+    StreamingApplication app = new NestedModuleApp();
+    Configuration conf = new Configuration(false);
+    LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
+    LogicalPlan dag = new LogicalPlan();
+    lpc.prepareDAG(dag, app, "ModuleApp");
+
+    dag.validate();
+    validateTopLevelOperators(dag);
+    validateTopLevelStreams(dag);
+    validatePublicMethods(dag);
+  }
+
+  private void validateTopLevelStreams(LogicalPlan dag)
+  {
+    List<String> streamNames = new ArrayList<>();
+    for (LogicalPlan.StreamMeta streamMeta : dag.getAllStreams()) {
+      streamNames.add(streamMeta.getName());
+    }
+
+    Assert.assertTrue(streamNames.contains(componentName("Mb", "O1_M1")));
+    Assert.assertTrue(streamNames.contains("O2_Ma"));
+    Assert.assertTrue(streamNames.contains("Mb_Mc"));
+    Assert.assertTrue(streamNames.contains(componentName("Mb", "O1_O2")));
+    Assert.assertTrue(streamNames.contains(componentName("Ma", "M1_M2&O1")));
+    Assert.assertTrue(streamNames.contains(componentName("Md", "O1_M1")));
+    Assert.assertTrue(streamNames.contains(componentName("Ma_Md")));
+    Assert.assertTrue(streamNames.contains(componentName("Mc", "M1_M2&O1")));
+    Assert.assertTrue(streamNames.contains(componentName("Md", "O1_O2")));
+    Assert.assertTrue(streamNames.contains("Ma_Mb"));
+    Assert.assertTrue(streamNames.contains("O1_O2"));
+
+    validateSeperateStream(dag, componentName("Mb", "O1_M1"), componentName("Mb", "O1"),
+        componentName("Mb", "M1", "O1"));
+    validateSeperateStream(dag, "O2_Ma", "O2", componentName("Ma", "M1", "O1"));
+    validateSeperateStream(dag, "Mb_Mc", componentName("Mb", "O2"), componentName("Mc", "M1",
"O1"));
+    validateSeperateStream(dag, componentName("Mb", "O1_O2"), componentName("Mb", "O1"),
componentName("Mb", "O2"));
+    validateSeperateStream(dag, componentName("Ma", "M1_M2&O1"), componentName("Ma",
"M1", "O1"),
+        componentName("Ma", "O1"), componentName("Ma", "M2", "O1"));
+    validateSeperateStream(dag, componentName("Md", "O1_M1"), componentName("Md", "O1"),
+        componentName("Md", "M1", "O1"));
+    validateSeperateStream(dag, "Ma_Md", componentName("Ma", "O1"), componentName("Md", "O1"));
+    validateSeperateStream(dag, componentName("Mc", "M1_M2&O1"), componentName("Mc",
"M1", "O1"),
+        componentName("Mc", "O1"), componentName("Mc", "M2", "O1"));
+    validateSeperateStream(dag, componentName("Md", "O1_O2"), componentName("Md", "O1"),
componentName("Md", "O2"));
+    validateSeperateStream(dag, "Ma_Mb", componentName("Ma", "M2", "O1"), componentName("Mb",
"O1"));
+    validateSeperateStream(dag, "O1_O2", "O1", "O2", componentName("Me", "O1"));
+  }
+
+  private void validateSeperateStream(LogicalPlan dag, String streamName, String inputOperatorName,
+      String... outputOperatorNames)
+  {
+    LogicalPlan.StreamMeta streamMeta = dag.getStream(streamName);
+    String sourceName = streamMeta.getSource().getOperatorMeta().getName();
+
+    List<String> sinksName = new ArrayList<>();
+    for (LogicalPlan.InputPortMeta inputPortMeta : streamMeta.getSinks()) {
+      sinksName.add(inputPortMeta.getOperatorWrapper().getName());
+    }
+
+    Assert.assertTrue(inputOperatorName.equals(sourceName));
+    Assert.assertEquals(outputOperatorNames.length, sinksName.size());
+
+    for (String outputOperatorName : outputOperatorNames) {
+      Assert.assertTrue(sinksName.contains(outputOperatorName));
+    }
+  }
+
+  private void validateTopLevelOperators(LogicalPlan dag)
+  {
+    List<String> operatorNames = new ArrayList<>();
+    for (LogicalPlan.OperatorMeta operatorMeta : dag.getAllOperators()) {
+      operatorNames.add(operatorMeta.getName());
+    }
+    Assert.assertTrue(operatorNames.contains("O1"));
+    Assert.assertTrue(operatorNames.contains("O2"));
+    Assert.assertTrue(operatorNames.contains(componentName("Ma", "M1", "O1")));
+    Assert.assertTrue(operatorNames.contains(componentName("Ma", "M2", "O1")));
+    Assert.assertTrue(operatorNames.contains(componentName("Ma", "O1")));
+    Assert.assertTrue(operatorNames.contains(componentName("Mb", "O1")));
+    Assert.assertTrue(operatorNames.contains(componentName("Mb", "M1", "O1")));
+    Assert.assertTrue(operatorNames.contains(componentName("Mb", "O2")));
+    Assert.assertTrue(operatorNames.contains(componentName("Mc", "M1", "O1")));
+    Assert.assertTrue(operatorNames.contains(componentName("Mc", "M2", "O1")));
+    Assert.assertTrue(operatorNames.contains(componentName("Mc", "O1")));
+    Assert.assertTrue(operatorNames.contains(componentName("Md", "O1")));
+    Assert.assertTrue(operatorNames.contains(componentName("Md", "M1", "O1")));
+    Assert.assertTrue(operatorNames.contains(componentName("Md", "O2")));
+
+    validateOperatorPropertyValue(dag, "O1", 1);
+    validateOperatorPropertyValue(dag, "O2", 2);
+    validateOperatorPropertyValue(dag, componentName("Ma", "M1", "O1"), 11);
+    validateOperatorPropertyValue(dag, componentName("Ma", "M2", "O1"), 12);
+    validateOperatorPropertyValue(dag, componentName("Ma", "O1"), 13);
+    validateOperatorPropertyValue(dag, componentName("Mb", "O1"), 21);
+    validateOperatorPropertyValue(dag, componentName("Mb", "M1", "O1"), 22);
+    validateOperatorPropertyValue(dag, componentName("Mb", "O2"), 23);
+    validateOperatorPropertyValue(dag, componentName("Mc", "M1", "O1"), 31);
+    validateOperatorPropertyValue(dag, componentName("Mc", "M2", "O1"), 32);
+    validateOperatorPropertyValue(dag, componentName("Mc", "O1"), 33);
+    validateOperatorPropertyValue(dag, componentName("Md", "O1"), 41);
+    validateOperatorPropertyValue(dag, componentName("Md", "M1", "O1"), 42);
+    validateOperatorPropertyValue(dag, componentName("Md", "O2"), 43);
+
+    validateOperatorParent(dag, "O1", null);
+    validateOperatorParent(dag, "O2", null);
+    validateOperatorParent(dag, componentName("Ma", "M1", "O1"), componentName("Ma", "M1"));
+    validateOperatorParent(dag, componentName("Ma", "M2", "O1"), componentName("Ma", "M2"));
+    validateOperatorParent(dag, componentName("Ma", "O1"), "Ma");
+    validateOperatorParent(dag, componentName("Mb", "O1"), "Mb");
+    validateOperatorParent(dag, componentName("Mb", "M1", "O1"), componentName("Mb", "M1"));
+    validateOperatorParent(dag, componentName("Mb", "O2"), "Mb");
+    validateOperatorParent(dag, componentName("Mc", "M1", "O1"), componentName("Mc", "M1"));
+    validateOperatorParent(dag, componentName("Mc", "M2", "O1"), componentName("Mc", "M2"));
+    validateOperatorParent(dag, componentName("Mc", "O1"), "Mc");
+    validateOperatorParent(dag, componentName("Md", "O1"), "Md");
+    validateOperatorParent(dag, componentName("Md", "M1", "O1"), componentName("Md", "M1"));
+    validateOperatorParent(dag, componentName("Md", "O2"), "Md");
+  }
+
+  private void validateOperatorParent(LogicalPlan dag, String operatorName, String parentModuleName)
+  {
+    LogicalPlan.OperatorMeta operatorMeta = dag.getOperatorMeta(operatorName);
+    if (parentModuleName == null) {
+      Assert.assertNull(operatorMeta.getModuleName());
+    } else {
+      Assert.assertTrue(parentModuleName.equals(operatorMeta.getModuleName()));
+    }
+  }
+
+  private void validateOperatorPropertyValue(LogicalPlan dag, String operatorName, int expectedValue)
+  {
+    LogicalPlan.OperatorMeta oMeta = dag.getOperatorMeta(operatorName);
+    if (operatorName.equals("O1")) {
+      DummyInputOperator operator = (DummyInputOperator)oMeta.getOperator();
+      Assert.assertEquals(expectedValue, operator.getInputOperatorProp());
+    } else {
+      DummyOperator operator = (DummyOperator)oMeta.getOperator();
+      Assert.assertEquals(expectedValue, operator.getOperatorProp());
+    }
+  }
+
+  private void validatePublicMethods(LogicalPlan dag)
+  {
+    // Logical dag contains 4 modules added on top level.
+    List<String> moduleNames = new ArrayList<>();
+    for (LogicalPlan.ModuleMeta moduleMeta : dag.getAllModules()) {
+      moduleNames.add(moduleMeta.getName());
+    }
+    Assert.assertTrue(moduleNames.contains("Ma"));
+    Assert.assertTrue(moduleNames.contains("Mb"));
+    Assert.assertTrue(moduleNames.contains("Mc"));
+    Assert.assertTrue(moduleNames.contains("Md"));
+    Assert.assertTrue(moduleNames.contains("Me"));
+    Assert.assertEquals("Number of modules are 5", 5, dag.getAllModules().size());
+
+    // correct module meta is returned by getMeta call.
+    LogicalPlan.ModuleMeta m = dag.getModuleMeta("Ma");
+    Assert.assertEquals("Name of module is Ma", m.getName(), "Ma");
+
+  }
+
+  private static String componentName(String... names)
+  {
+    if (names.length == 0) {
+      return "";
+    }
+    StringBuilder sb = new StringBuilder(names[0]);
+    for (int i = 1; i < names.length; i++) {
+      sb.append(LogicalPlan.MODULE_NAMESPACE_SEPARATOR);
+      sb.append(names[i]);
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Generate a conflict, Add a top level operator with name "m1_O1",
+   * and add a module "m1" which will populate operator "O1", causing name conflict with
+   * top level operator.
+   */
+  @Test(expected = java.lang.IllegalArgumentException.class)
+  public void conflictingNamesWithExpandedModule()
+  {
+    Configuration conf = new Configuration(false);
+    LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
+    LogicalPlan dag = new LogicalPlan();
+    DummyInputOperator in = dag.addOperator(componentName("m1", "O1"), new DummyInputOperator());
+    Level2ModuleA module = dag.addModule("m1", new Level2ModuleA());
+    dag.addStream("s1", in.out, module.mIn);
+    lpc.prepareDAG(dag, null, "ModuleApp");
+    dag.validate();
+  }
+
+  /**
+   * Module and Operator with same name is not allowed in a DAG, to prevent properties
+   * conflict.
+   */
+  @Test(expected = java.lang.IllegalArgumentException.class)
+  public void conflictingNamesWithOperator1()
+  {
+    Configuration conf = new Configuration(false);
+    LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
+    LogicalPlan dag = new LogicalPlan();
+    DummyInputOperator in = dag.addOperator("M1", new DummyInputOperator());
+    Level2ModuleA module = dag.addModule("M1", new Level2ModuleA());
+    dag.addStream("s1", in.out, module.mIn);
+    lpc.prepareDAG(dag, null, "ModuleApp");
+    dag.validate();
+  }
+
+  /**
+   * Module and Operator with same name is not allowed in a DAG, to prevent properties
+   * conflict.
+   */
+  @Test(expected = java.lang.IllegalArgumentException.class)
+  public void conflictingNamesWithOperator2()
+  {
+    Configuration conf = new Configuration(false);
+    LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
+    LogicalPlan dag = new LogicalPlan();
+    Level2ModuleA module = dag.addModule("M1", new Level2ModuleA());
+    DummyInputOperator in = dag.addOperator("M1", new DummyInputOperator());
+    dag.addStream("s1", in.out, module.mIn);
+    lpc.prepareDAG(dag, null, "ModuleApp");
+    dag.validate();
+  }
+}


Mime
View raw message