apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [1/2] incubator-apex-malhar git commit: APEXCORE-60 added iteration demo
Date Thu, 25 Feb 2016 21:44:06 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 ca5ab1124 -> 8331f56da


APEXCORE-60 added iteration demo


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/d13c6f77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/d13c6f77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/d13c6f77

Branch: refs/heads/devel-3
Commit: d13c6f77c5c1ec1dfc4387bb7b6d6ecb220831d5
Parents: 9c557fc
Author: David Yan <david@datatorrent.com>
Authored: Thu Nov 19 13:20:38 2015 -0800
Committer: David Yan <david@datatorrent.com>
Committed: Tue Feb 23 10:03:29 2016 -0800

----------------------------------------------------------------------
 demos/iteration/pom.xml                         |  37 ++++
 demos/iteration/src/assemble/appPackage.xml     |  59 +++++++
 .../demos/iteration/Application.java            | 168 +++++++++++++++++++
 .../demos/iteration/package-info.java           |  22 +++
 .../src/main/resources/META-INF/properties.xml  |  44 +++++
 .../demos/iteration/ApplicationTest.java        |  86 ++++++++++
 .../src/test/resources/log4j.properties         |  40 +++++
 demos/pom.xml                                   |   1 +
 8 files changed, 457 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/pom.xml
----------------------------------------------------------------------
diff --git a/demos/iteration/pom.xml b/demos/iteration/pom.xml
new file mode 100644
index 0000000..5891f42
--- /dev/null
+++ b/demos/iteration/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>iteration-demo</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Apex Malhar (incubating) Iteration Demo</name>
+  <description>DataTorrent demo applications that demonstrates the iteration feature.</description>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-demos</artifactId>
+    <version>3.4.0-incubating-SNAPSHOT</version>
+  </parent>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/demos/iteration/src/assemble/appPackage.xml b/demos/iteration/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/demos/iteration/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java
----------------------------------------------------------------------
diff --git a/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java
b/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java
new file mode 100644
index 0000000..c0178d8
--- /dev/null
+++ b/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.demos.iteration;
+
+import com.datatorrent.api.Context;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.common.util.DefaultDelayOperator;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+/**
+ * Iteration demo : <br>
+ *
+ * <pre>
+ * LocalMode.runApp(new Application(), 600000); // 10 min run
+ * </pre>
+ *
+ * Run Success : <br>
+ * For successful deployment and run, user should see the Fibonacci sequence, something like
the
+ * following output on the console:
+ *
+ * <pre>
+ * 1
+ * 1
+ * 2
+ * 3
+ * 5
+ * 8
+ * 13
+ * 21
+ * 34
+ * 55
+ * ...
+ * </pre>
+ *
+ */
+@ApplicationAnnotation(name="IterationDemo")
+public class Application implements StreamingApplication
+{
+  private final static Logger LOG = LoggerFactory.getLogger(Application.class);
+  private String extraOutputFileName; // for unit test
+
+  public static class FibonacciOperator extends BaseOperator
+  {
+    public long currentNumber = 1;
+    private transient long tempNum;
+    public transient DefaultInputPort<Object> dummyInputPort = new DefaultInputPort<Object>()
+    {
+      @Override
+      public void process(Object tuple)
+      {
+      }
+    };
+    public transient DefaultInputPort<Long> input = new DefaultInputPort<Long>()
+    {
+      @Override
+      public void process(Long tuple)
+      {
+        tempNum = (currentNumber == 1) ? 1 : tuple;
+      }
+    };
+    public transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
+
+
+    @Override
+    public void endWindow()
+    {
+      output.emit(currentNumber);
+      currentNumber += tempNum;
+      if (currentNumber <= 0) {
+        currentNumber = 1;
+      }
+    }
+  }
+
+  public static class StdoutOperator extends BaseOperator
+  {
+    private String extraOutputFileName; // for unit test
+    private transient PrintStream extraOutputStream;
+    /**
+     * This is the input port which receives the tuples that will be written to stdout.
+     */
+    public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+    {
+      @Override
+      @SuppressWarnings("UseOfSystemOutOrSystemErr")
+      public void process(Object t)
+      {
+        String s = t.toString();
+        System.out.println(s);
+        if (extraOutputStream != null) {
+          extraOutputStream.println(s);
+        }
+      }
+    };
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      if (extraOutputFileName != null) {
+        try {
+          extraOutputStream = new PrintStream(new FileOutputStream(extraOutputFileName),
true);
+        } catch (IOException ex) {
+          throw new RuntimeException(ex);
+        }
+      }
+    }
+
+    @Override
+    public void teardown()
+    {
+      extraOutputStream.close();
+    }
+
+    public void setExtraOutputFileName(String fileName)
+    {
+      this.extraOutputFileName = fileName;
+    }
+  }
+
+  public void setExtraOutputFileName(String fileName)
+  {
+    this.extraOutputFileName = fileName;
+  }
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
+    FibonacciOperator fib = dag.addOperator("FIB", FibonacciOperator.class);
+    DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+    StdoutOperator console = new StdoutOperator();
+    console.setExtraOutputFileName(extraOutputFileName);
+    dag.addOperator("console", console);
+    dag.addStream("dummy_to_operator", rand.integer_data, fib.dummyInputPort);
+    dag.addStream("operator_to_delay", fib.output, opDelay.input, console.input);
+    dag.addStream("delay_to_operator", opDelay.output, fib.input);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/src/main/java/com/datatorrent/demos/iteration/package-info.java
----------------------------------------------------------------------
diff --git a/demos/iteration/src/main/java/com/datatorrent/demos/iteration/package-info.java
b/demos/iteration/src/main/java/com/datatorrent/demos/iteration/package-info.java
new file mode 100644
index 0000000..0d24638
--- /dev/null
+++ b/demos/iteration/src/main/java/com/datatorrent/demos/iteration/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Iteration demonstration application.
+ */
+package com.datatorrent.demos.iteration;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/iteration/src/main/resources/META-INF/properties.xml b/demos/iteration/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..bf65e22
--- /dev/null
+++ b/demos/iteration/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <!-- Memory settings for all demos -->
+  <property>
+    <name>dt.attr.MASTER_MEMORY_MB</name>
+    <value>512</value>
+  </property>
+  <property>
+    <name>dt.attr.DEBUG</name>
+    <value>true</value>
+  <property>
+    <name>dt.application.*.operator.*.attr.MEMORY_MB</name>
+    <value>128</value>
+  </property>
+  <property>
+    <name>dt.application.*.operator.*.attr.JVM_OPTIONS</name>
+    <value>-Xmx128M</value>
+  </property>
+  <property>
+    <name>dt.application.*.operator.*.port.*.attr.BUFFER_MEMORY_MB</name>
+    <value>128</value>
+  </property>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java
b/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java
new file mode 100644
index 0000000..7804fcd
--- /dev/null
+++ b/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.demos.iteration;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.api.LocalMode;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+
+
+/**
+ *
+ */
+public class ApplicationTest
+{
+  @Test
+  public void testIterationApp() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    Application app = new Application();
+    String outputFileName = "target/output.txt";
+    long timeout = 10 * 1000; // 10 seconds
+
+    new File(outputFileName).delete();
+    app.setExtraOutputFileName(outputFileName);
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.runAsync();
+
+    long startTime = System.currentTimeMillis();
+    do {
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException ex) {
+        break;
+      }
+      File file = new File(outputFileName);
+      if (file.length() > 50) {
+        break;
+      }
+    } while (System.currentTimeMillis() - startTime < timeout);
+
+    lc.shutdown();
+    try (BufferedReader br = new BufferedReader(new FileReader(outputFileName))) {
+      Assert.assertEquals("1", br.readLine());
+      Assert.assertEquals("1", br.readLine());
+      Assert.assertEquals("2", br.readLine());
+      Assert.assertEquals("3", br.readLine());
+      Assert.assertEquals("5", br.readLine());
+      Assert.assertEquals("8", br.readLine());
+      Assert.assertEquals("13", br.readLine());
+      Assert.assertEquals("21", br.readLine());
+      Assert.assertEquals("34", br.readLine());
+      Assert.assertEquals("55", br.readLine());
+      Assert.assertEquals("89", br.readLine());
+      Assert.assertEquals("144", br.readLine());
+      Assert.assertEquals("233", br.readLine());
+      Assert.assertEquals("377", br.readLine());
+      Assert.assertEquals("610", br.readLine());
+      Assert.assertEquals("987", br.readLine());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/demos/iteration/src/test/resources/log4j.properties b/demos/iteration/src/test/resources/log4j.properties
new file mode 100644
index 0000000..451cff3
--- /dev/null
+++ b/demos/iteration/src/test/resources/log4j.properties
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
index e650ea2..032583a 100644
--- a/demos/pom.xml
+++ b/demos/pom.xml
@@ -184,6 +184,7 @@
     <module>uniquecount</module>
     <module>r</module>
     <module>echoserver</module>
+    <module>iteration</module>
   </modules>
 
   <dependencies>


Mime
View raw message