apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [04/30] apex-malhar git commit: Renamed demos to examples. Packages and artifactid names are changed as suggested.
Date Tue, 07 Mar 2017 06:58:09 GMT
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
new file mode 100644
index 0000000..30ce061
--- /dev/null
+++ b/examples/pom.xml
@@ -0,0 +1,231 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar</artifactId>
+    <version>3.7.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>malhar-examples</artifactId>
+  <packaging>pom</packaging>
+  <name>Apache Apex Malhar Examples</name>
+
+  <properties>
+    <apex.apppackage.groupid>${project.groupId}</apex.apppackage.groupid>
+    <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
+    <semver.plugin.skip>true</semver.plugin.skip>
+    <maven.deploy.skip>true</maven.deploy.skip>
+  </properties>
+
+  <profiles>
+	<profile>
+    <id>example-plugin-activation</id>
+    <activation>
+        <file>
+            <exists>${basedir}/src/main</exists>
+        </file>
+    </activation>
+    <build>
+      <plugins>
+        <plugin>
+          <artifactId>maven-eclipse-plugin</artifactId>
+          <version>2.9</version>
+          <configuration>
+            <downloadSources>true</downloadSources>
+          </configuration>
+        </plugin>
+        <plugin>
+          <artifactId>maven-compiler-plugin</artifactId>
+          <version>2.3.2</version>
+          <configuration>
+            <encoding>UTF-8</encoding>
+            <source>1.7</source>
+            <target>1.7</target>
+            <debug>true</debug>
+            <optimize>false</optimize>
+            <showDeprecation>true</showDeprecation>
+            <showWarnings>true</showWarnings>
+          </configuration>
+        </plugin>
+        <plugin>
+          <artifactId>maven-dependency-plugin</artifactId>
+          <version>2.8</version>
+          <executions>
+            <execution>
+              <id>copy-dependencies</id>
+              <phase>prepare-package</phase>
+              <goals>
+                <goal>copy-dependencies</goal>
+              </goals>
+              <configuration>
+                <outputDirectory>target/deps</outputDirectory>
+                <includeScope>runtime</includeScope>
+              </configuration>
+            </execution>
+          </executions>
+        </plugin>
+        <plugin>
+          <artifactId>maven-assembly-plugin</artifactId>
+          <executions>
+            <execution>
+              <id>app-package-assembly</id>
+              <phase>package</phase>
+              <goals>
+                <goal>single</goal>
+              </goals>
+              <configuration>
+                <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
+                <appendAssemblyId>false</appendAssemblyId>
+                <descriptors>
+                  <descriptor>src/assemble/appPackage.xml</descriptor>
+                </descriptors>
+                <archiverConfig>
+                  <defaultDirectoryMode>0755</defaultDirectoryMode>
+                </archiverConfig>
+                <archive>
+                  <manifestEntries>
+                    <Class-Path>${apex.apppackage.classpath}</Class-Path>
+                    <DT-Engine-Version>${apex.core.version}</DT-Engine-Version>
+                    <DT-App-Package-Group-Id>${apex.apppackage.groupid}</DT-App-Package-Group-Id>
+                    <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
+                    <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
+                    <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
+                    <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
+                  </manifestEntries>
+                </archive>
+              </configuration>
+            </execution>
+          </executions>
+        </plugin>
+        <plugin>
+          <artifactId>maven-antrun-plugin</artifactId>
+          <version>1.7</version>
+          <executions>
+            <execution>
+              <phase>package</phase>
+              <configuration>
+                <target>
+                  <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
+                        tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
+                </target>
+              </configuration>
+              <goals>
+                <goal>run</goal>
+              </goals>
+            </execution>
+          </executions>
+        </plugin>
+        <plugin>
+          <groupId>org.codehaus.mojo</groupId>
+          <artifactId>build-helper-maven-plugin</artifactId>
+          <version>1.9.1</version>
+          <executions>
+            <execution>
+              <id>attach-artifacts</id>
+              <phase>package</phase>
+              <goals>
+                <goal>attach-artifact</goal>
+              </goals>
+              <configuration>
+                <artifacts>
+                  <artifact>
+                    <file>target/${project.artifactId}-${project.version}.apa</file>
+                    <type>apa</type>
+                  </artifact>
+                </artifacts>
+                <skipAttach>false</skipAttach>
+              </configuration>
+            </execution>
+          </executions>
+        </plugin>
+      </plugins>
+	</build>
+	</profile>
+    <profile>
+      <id>all-modules</id>
+      <modules>
+        <module>distributedistinct</module>
+        <module>highlevelapi</module>
+        <module>sql</module>
+      </modules>
+    </profile>
+  </profiles>
+
+  <modules>
+    <module>machinedata</module>
+    <module>pi</module>
+    <module>twitter</module>
+    <module>yahoofinance</module>
+    <module>frauddetect</module>
+    <module>mobile</module>
+    <module>wordcount</module>
+    <module>mrmonitor</module>
+    <module>mroperator</module>
+    <module>uniquecount</module>
+    <module>r</module>
+    <module>echoserver</module>
+    <module>iteration</module>
+  </modules>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>apex-common</artifactId>
+      <version>${apex.core.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.10</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/pom.xml
----------------------------------------------------------------------
diff --git a/examples/r/pom.xml b/examples/r/pom.xml
new file mode 100644
index 0000000..46b7d47
--- /dev/null
+++ b/examples/r/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>malhar-examples-r</artifactId>
+  <packaging>jar</packaging>
+
+ <name>Apache Apex Malhar R Example</name>
+  <description>Apex example applications for using R.</description>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-examples</artifactId>
+    <version>3.7.0-SNAPSHOT</version>
+  </parent>
+
+  <properties>
+    <skipTests>true</skipTests>
+  </properties>
+
+  <repositories>
+    <repository>
+      <id>datatorrent-3rd-party</id>
+      <name>Embedded repository for dependencies not available online</name>
+      <url>https://www.datatorrent.com/maven/content/repositories/thirdparty</url>
+      <snapshots>
+        <updatePolicy>daily</updatePolicy>
+      </snapshots>
+      <releases>
+        <updatePolicy>daily</updatePolicy>
+      </releases>
+    </repository>
+  </repositories>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.rosuda</groupId>
+      <artifactId>jri</artifactId>
+      <version>1.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.rosuda</groupId>
+      <artifactId>rengine</artifactId>
+      <version>1.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.rosuda</groupId>
+      <artifactId>jriengine</artifactId>
+      <version>1.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-contrib</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/r/src/assemble/appPackage.xml b/examples/r/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/r/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/FaithfulKey.java
----------------------------------------------------------------------
diff --git a/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/FaithfulKey.java b/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/FaithfulKey.java
new file mode 100755
index 0000000..8c08940
--- /dev/null
+++ b/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/FaithfulKey.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.r.oldfaithful;
+
+/**
+ * @since 2.1.0
+ */
+public class FaithfulKey
+{
+
+  private static final long serialVersionUID = 201403251620L;
+
+  private double eruptionDuration;
+  private int waitingTime;
+
+  public FaithfulKey()
+  {
+  }
+
+  public double getEruptionDuration()
+  {
+    return eruptionDuration;
+  }
+
+  public void setEruptionDuration(double eruptionDuration)
+  {
+    this.eruptionDuration = eruptionDuration;
+  }
+
+  public int getWaitingTime()
+  {
+    return waitingTime;
+  }
+
+  public void setWaitingTime(int waitingTime)
+  {
+    this.waitingTime = waitingTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/FaithfulRScript.java
----------------------------------------------------------------------
diff --git a/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/FaithfulRScript.java b/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/FaithfulRScript.java
new file mode 100755
index 0000000..4b61d42
--- /dev/null
+++ b/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/FaithfulRScript.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.r.oldfaithful;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.contrib.r.RScript;
+
+/**
+ * @since 2.1.0
+ */
+public class FaithfulRScript extends RScript
+{
+
+  private transient List<FaithfulKey> readingsList = new ArrayList<FaithfulKey>();
+  private int elapsedTime;
+  private static final Logger LOG = LoggerFactory.getLogger(FaithfulRScript.class);
+
+  public FaithfulRScript()
+  {
+    super();
+  }
+
+  public FaithfulRScript(String rScriptFilePath, String rFunction, String returnVariable)
+  {
+    super(rScriptFilePath, rFunction, returnVariable);
+  }
+
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<FaithfulKey> faithfulInput = new DefaultInputPort<FaithfulKey>()
+  {
+    @Override
+    public void process(FaithfulKey tuple)
+    {
+      // Create a map of ("String", values) to be passed to the process
+      // function in the RScipt operator's process()
+      readingsList.add(tuple);
+
+    }
+
+  };
+
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<Integer> inputElapsedTime = new DefaultInputPort<Integer>()
+  {
+    @Override
+    public void process(Integer eT)
+    {
+      elapsedTime = eT;
+    }
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+  }
+
+  @Override
+  public void endWindow()
+  {
+    if (readingsList.size() == 0) {
+      return;
+    }
+    LOG.info("Input data size: readingsList - " + readingsList.size());
+
+    double[] eruptionDuration = new double[readingsList.size()];
+    int[] waitingTime = new int[readingsList.size()];
+
+    for (int i = 0; i < readingsList.size(); i++) {
+      eruptionDuration[i] = readingsList.get(i).getEruptionDuration();
+      waitingTime[i] = readingsList.get(i).getWaitingTime();
+    }
+    LOG.info("Input data size: eruptionDuration - " + eruptionDuration.length);
+    LOG.info("Input data size: waitingTime - " + waitingTime.length);
+
+    HashMap<String, Object> map = new HashMap<String, Object>();
+
+    map.put("ELAPSEDTIME", elapsedTime);
+    map.put("ERUPTIONS", eruptionDuration);
+    map.put("WAITING", waitingTime);
+
+    super.process(map);
+    readingsList.clear();
+    map.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/InputGenerator.java
----------------------------------------------------------------------
diff --git a/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/InputGenerator.java b/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/InputGenerator.java
new file mode 100755
index 0000000..fb18726
--- /dev/null
+++ b/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/InputGenerator.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.r.oldfaithful;
+
+import java.util.Random;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+
+/**
+ *  The InputGenerator operator is used to generate input for the 'Old Faithful Geyser" application.
+ * This application accepts readings for the waiting time and the subsequent eruption duration
+ * of the 'Old Faithful' and based on this data, tries to predict the eruption duration of the next
+ * eruption given the elapsed time since the last eruption.
+ * The training data is generated for an application window and consists of multiple
+ * waiting times and eruption duration values.
+ * For every application window, it generates only one 'elapsed time' input for which the
+ * prediction would be made.
+ *
+ * @since 2.1.0
+ */
+
+public class InputGenerator implements InputOperator
+{
+
+  @SuppressWarnings("unused")
+  private static final Logger LOG = LoggerFactory.getLogger(InputGenerator.class);
+  private int blastCount = 1000;
+  private Random random = new Random();
+  private static int emitCount = 0;
+
+  public final transient DefaultOutputPort<FaithfulKey> outputPort = new DefaultOutputPort<FaithfulKey>();
+
+  public final transient DefaultOutputPort<Integer> elapsedTime = new DefaultOutputPort<Integer>();
+
+  public void setBlastCount(int blastCount)
+  {
+    this.blastCount = blastCount;
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+  }
+
+  @Override
+  public void endWindow()
+  {
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  private int nextRandomId(int min, int max)
+  {
+    int id;
+    do {
+      id = (int)Math.abs(Math.round(random.nextGaussian() * max));
+    }
+    while (id >= max);
+
+    if (id < min) {
+      id = min;
+    }
+    try {
+      // Slowdown input generation
+      if (emitCount++ % 97 == 0) {
+        Thread.sleep(1);
+      }
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    return id;
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    boolean elapsedTimeSent = false;
+
+    try {
+      for (int i = 0; i < blastCount; ++i) {
+        int waitingTime = nextRandomId(3600, 36000);
+
+        double eruptionDuration = -2.15 + 0.05 * waitingTime;
+        emitTuple(eruptionDuration, waitingTime);
+
+        if (!elapsedTimeSent) {
+          int eT = 0;
+
+          if (i % 100 == 0) {
+            eT = 54 + waitingTime;
+
+            emitElapsedTime(eT);
+            elapsedTimeSent = true;
+          }
+        }
+      }
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private void emitTuple(double eruptionDuration, int waitingTime)
+  {
+    FaithfulKey faithfulkey = new FaithfulKey();
+
+    faithfulkey.setEruptionDuration(eruptionDuration);
+    faithfulkey.setWaitingTime(waitingTime);
+
+    this.outputPort.emit(faithfulkey);
+  }
+
+  private void emitElapsedTime(int eT)
+  {
+    this.elapsedTime.emit(eT);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/OldFaithfulApplication.java
----------------------------------------------------------------------
diff --git a/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/OldFaithfulApplication.java b/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/OldFaithfulApplication.java
new file mode 100755
index 0000000..bd51c29
--- /dev/null
+++ b/examples/r/src/main/java/org/apache/apex/examples/r/oldfaithful/OldFaithfulApplication.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.r.oldfaithful;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+/**
+ * The application attempts to simulate 'Old Faithful Geyser" eruption.
+ * This application accepts readings for the waiting time and the subsequent eruption duration
+ * of the 'Old Faithful' and based on this data, tries to predict the eruption duration of the next
+ * eruption given the elapsed time since the last eruption.
+ * The training data is generated for an application window and consists of multiple
+ * waiting times and eruption duration values.
+ * For every application window, it generates only one 'elapsed time' input for which the
+ * prediction would be made.
+ * Model in R is in file ruptionModel.R located at
+ * examples/r/src/main/resources/com/datatorrent/examples/oldfaithful/ directory
+ *
+ * @since 2.1.0
+ */
+
+@ApplicationAnnotation(name = "OldFaithfulApplication")
+public class OldFaithfulApplication implements StreamingApplication
+{
+  private final DAG.Locality locality = null;
+
+  /**
+   * Create the DAG
+   */
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+
+    InputGenerator randomInputGenerator = dag.addOperator("rand", new InputGenerator());
+    FaithfulRScript rScriptOp = dag.addOperator("rScriptOp", new FaithfulRScript("com/datatorrent/examples/r/oldfaithful/eruptionModel.R", "eruptionModel", "retVal"));
+    ConsoleOutputOperator consoles = dag.addOperator("consoles", new ConsoleOutputOperator());
+
+    Map<String, FaithfulRScript.REXP_TYPE> argTypeMap = new HashMap<String, FaithfulRScript.REXP_TYPE>();
+
+    argTypeMap.put("ELAPSEDTIME", FaithfulRScript.REXP_TYPE.REXP_INT);
+    argTypeMap.put("ERUPTIONS", FaithfulRScript.REXP_TYPE.REXP_ARRAY_DOUBLE);
+    argTypeMap.put("WAITING", FaithfulRScript.REXP_TYPE.REXP_ARRAY_INT);
+
+    rScriptOp.setArgTypeMap(argTypeMap);
+
+    dag.addStream("ingen_faithfulRscript", randomInputGenerator.outputPort, rScriptOp.faithfulInput).setLocality(locality);
+    dag.addStream("ingen_faithfulRscript_eT", randomInputGenerator.elapsedTime, rScriptOp.inputElapsedTime).setLocality(locality);
+    dag.addStream("faithfulRscript_console_s", rScriptOp.strOutput, consoles.input).setLocality(locality);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/r/src/main/resources/META-INF/properties.xml b/examples/r/src/main/resources/META-INF/properties.xml
new file mode 100755
index 0000000..07c1e87
--- /dev/null
+++ b/examples/r/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+<!--properties for R example  -->
+  <property>
+        <name>dt.application.OldFaithfulApplication.class</name>
+        <value>org.apache.apex.examples.r.oldfaithful.OldFaithfulApplication</value>
+        <description>An alias for OldFaithful application</description>
+  </property>
+
+  <property>
+        <name>dt.application.OldFaithfulApplication.operator.*.attr.MEMORY_MB</name>
+        <value>1024</value>
+  </property>
+  
+<!-- Need this to information for loading native libraries -->
+  <property>
+      <name>dt.attr.CONTAINER_JVM_OPTIONS</name>
+      <value>-Djava.library.path=/usr/local/lib/R/site-library/rJava/jri/</value>
+  </property>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/src/main/resources/org/apache/apex/examples/r/oldfaithful/eruptionModel.R
----------------------------------------------------------------------
diff --git a/examples/r/src/main/resources/org/apache/apex/examples/r/oldfaithful/eruptionModel.R b/examples/r/src/main/resources/org/apache/apex/examples/r/oldfaithful/eruptionModel.R
new file mode 100755
index 0000000..e46fa8d
--- /dev/null
+++ b/examples/r/src/main/resources/org/apache/apex/examples/r/oldfaithful/eruptionModel.R
@@ -0,0 +1,60 @@
+#!/usr/bin/Rscript
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+# This script apply the simple linear regression model for the data set 'faithful',
+# and estimates the next eruption duration given the waiting time since the last eruption.
+#
+
+ eruptionModel <- function() {
+
+ datavar = data.frame(ERUPTIONS, WAITING)
+
+ #attach data variable
+ attach(datavar)
+
+ #create a linear model using lm(FORMULA, DATAVAR)
+ #predict the fall eruption duration (ERUPT) using the waiting time since the last eruption (WAITING)
+ eruption.lm <- lm(ERUPTIONS ~ WAITING, datavar)
+
+ #display linear model
+ eruption.lm
+
+ # Get the values of the intercept and unemployment so as to be able to predict the enrolment
+ interc<-eruption.lm$coeff[["(Intercept)"]]
+ eruptionDuration<-eruption.lm$coeff[["WAITING"]]
+
+ # Calculate the enrollment based on the percentage being asked for, and the model that has been rated above.
+ nextEruptionDuration<-(interc+(eruptionDuration * ELAPSEDTIME))
+
+retVal<-paste("nextEruptionDuration ", nextEruptionDuration, sep=": ")
+#retVal<-c("interc : ",interc, ", eruptionDuration : ", eruptionDuration,", nextEruptionDuration : ", nextEruptionDuration)
+
+sort( sapply(mget(ls()),object.size) )
+
+detach(datavar);
+
+# Clear all the data from R workspace
+rm(datavar);
+rm(ERUPTIONS);
+rm(WAITING);
+
+return(retVal)
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/src/test/java/org/apache/apex/examples/r/oldfaithful/OldFaithfulApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/r/src/test/java/org/apache/apex/examples/r/oldfaithful/OldFaithfulApplicationTest.java b/examples/r/src/test/java/org/apache/apex/examples/r/oldfaithful/OldFaithfulApplicationTest.java
new file mode 100755
index 0000000..0ebe958
--- /dev/null
+++ b/examples/r/src/test/java/org/apache/apex/examples/r/oldfaithful/OldFaithfulApplicationTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.r.oldfaithful;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+public class OldFaithfulApplicationTest
+{
+
+  private static final Logger LOG = LoggerFactory.getLogger(OldFaithfulApplicationTest.class);
+
+  @Test
+  public void testSomeMethod() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    OldFaithfulApplication app = new OldFaithfulApplication();
+    app.populateDAG(lma.getDAG(), new Configuration(false));
+
+    try {
+      LocalMode.Controller lc = lma.getController();
+      lc.setHeartbeatMonitoringEnabled(false);
+      lc.run(5000);
+    } catch (Exception e) {
+      LOG.error("Exception: ", e);
+      Assert.fail("Unexpected exception.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/src/test/resources/dt-site-oldfaithful.xml
----------------------------------------------------------------------
diff --git a/examples/r/src/test/resources/dt-site-oldfaithful.xml b/examples/r/src/test/resources/dt-site-oldfaithful.xml
new file mode 100755
index 0000000..07c1e87
--- /dev/null
+++ b/examples/r/src/test/resources/dt-site-oldfaithful.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+<!--properties for R example  -->
+  <property>
+        <name>dt.application.OldFaithfulApplication.class</name>
+        <value>org.apache.apex.examples.r.oldfaithful.OldFaithfulApplication</value>
+        <description>An alias for OldFaithful application</description>
+  </property>
+
+  <property>
+        <name>dt.application.OldFaithfulApplication.operator.*.attr.MEMORY_MB</name>
+        <value>1024</value>
+  </property>
+  
+<!-- Need this to information for loading native libraries -->
+  <property>
+      <name>dt.attr.CONTAINER_JVM_OPTIONS</name>
+      <value>-Djava.library.path=/usr/local/lib/R/site-library/rJava/jri/</value>
+  </property>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/r/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/r/src/test/resources/log4j.properties b/examples/r/src/test/resources/log4j.properties
new file mode 100755
index 0000000..cf0d19e
--- /dev/null
+++ b/examples/r/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/pom.xml
----------------------------------------------------------------------
diff --git a/examples/sql/pom.xml b/examples/sql/pom.xml
new file mode 100644
index 0000000..7eb0f4d
--- /dev/null
+++ b/examples/sql/pom.xml
@@ -0,0 +1,102 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>malhar-examples-sql</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Apex Malhar SQL API Example</name>
+  <description>Apex example applications that use SQL APIs to construct a DAG</description>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-examples</artifactId>
+    <version>3.7.0-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.9.1</version>
+        <executions>
+          <execution>
+            <id>attach-artifacts</id>
+            <phase>package</phase>
+            <goals>
+              <goal>attach-artifact</goal>
+            </goals>
+            <configuration>
+              <artifacts>
+                <artifact>
+                  <file>target/${project.artifactId}-${project.version}.apa</file>
+                  <type>apa</type>
+                </artifact>
+              </artifacts>
+              <skipAttach>false</skipAttach>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>apex-engine</artifactId>
+      <version>${apex.core.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-sql</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <!-- For KafkaTest -->
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-kafka</artifactId>
+      <version>${project.parent.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.11</artifactId>
+      <version>0.9.0.0</version>
+      <classifier>test</classifier>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/sql/src/assemble/appPackage.xml b/examples/sql/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/sql/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java
----------------------------------------------------------------------
diff --git a/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java b/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java
new file mode 100644
index 0000000..80b997d
--- /dev/null
+++ b/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
+import org.apache.apex.malhar.sql.SQLExecEnvironment;
+import org.apache.apex.malhar.sql.table.CSVMessageFormat;
+import org.apache.apex.malhar.sql.table.FileEndpoint;
+import org.apache.apex.malhar.sql.table.StreamEndpoint;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.ImmutableMap;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.contrib.parser.CsvParser;
+
+
+@ApplicationAnnotation(name = "FusionStyleSQLApplication")
+/**
+ * @since 3.6.0
+ */
+public class FusionStyleSQLApplication implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    SQLExecEnvironment env = SQLExecEnvironment.getEnvironment();
+    env.registerFunction("APEXCONCAT", PureStyleSQLApplication.class, "apex_concat_str");
+
+    Map<String, Class> fieldMapping = ImmutableMap.<String, Class>of(
+        "RowTime", Date.class,
+        "id", Integer.class,
+        "Product", String.class,
+        "units", Integer.class);
+
+    // Add Kafka Input
+    KafkaSinglePortInputOperator kafkaInput = dag.addOperator("KafkaInput", KafkaSinglePortInputOperator.class);
+    kafkaInput.setInitialOffset("EARLIEST");
+
+    // Add CSVParser
+    CsvParser csvParser = dag.addOperator("CSVParser", CsvParser.class);
+    dag.addStream("KafkaToCSV", kafkaInput.outputPort, csvParser.in);
+
+    // Register CSV Parser output as input table for first SQL
+    env.registerTable(conf.get("sqlSchemaInputName"), new StreamEndpoint(csvParser.out, fieldMapping));
+
+    // Register FileEndpoint as output table for second SQL.
+    env.registerTable(conf.get("sqlSchemaOutputName"), new FileEndpoint(conf.get("folderPath"),
+        conf.get("fileName"), new CSVMessageFormat(conf.get("sqlSchemaOutputDef"))));
+
+    // Add second SQL to DAG
+    env.executeSQL(dag, conf.get("sql"));
+  }
+
+  public static class PassThroughOperator extends BaseOperator
+  {
+    public final transient DefaultOutputPort output = new DefaultOutputPort();
+    public final transient DefaultInputPort input = new DefaultInputPort()
+    {
+      @Override
+      public void process(Object o)
+      {
+        output.emit(output);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java
----------------------------------------------------------------------
diff --git a/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java b/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java
new file mode 100644
index 0000000..79295f9
--- /dev/null
+++ b/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import org.apache.apex.malhar.sql.SQLExecEnvironment;
+import org.apache.apex.malhar.sql.table.CSVMessageFormat;
+import org.apache.apex.malhar.sql.table.FileEndpoint;
+import org.apache.apex.malhar.sql.table.KafkaEndpoint;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+@ApplicationAnnotation(name = "PureStyleSQLApplication")
+/**
+ * @since 3.6.0
+ */
+public class PureStyleSQLApplication implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    // Source definition
+    String schemaInName = conf.get("schemaInName");
+    String schemaInDef = conf.get("schemaInDef");
+    String broker = conf.get("broker");
+    String sourceTopic = conf.get("topic");
+
+    // Destination definition
+    String schemaOutName = conf.get("schemaOutName");
+    String schemaOutDef = conf.get("schemaOutDef");
+    String outputFolder = conf.get("outputFolder");
+    String outFilename = conf.get("destFileName");
+
+    // SQL statement
+    String sql = conf.get("sql");
+
+    SQLExecEnvironment.getEnvironment()
+        .registerTable(schemaInName, new KafkaEndpoint(broker, sourceTopic,
+            new CSVMessageFormat(schemaInDef)))
+        .registerTable(schemaOutName, new FileEndpoint(outputFolder, outFilename,
+            new CSVMessageFormat(schemaOutDef)))
+        .registerFunction("APEXCONCAT", this.getClass(), "apex_concat_str")
+        .executeSQL(dag, sql);
+  }
+
+  public static String apex_concat_str(String s1, String s2)
+  {
+    return s1 + s2;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java
----------------------------------------------------------------------
diff --git a/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java b/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java
new file mode 100644
index 0000000..da4f563
--- /dev/null
+++ b/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import org.apache.apex.malhar.sql.SQLExecEnvironment;
+import org.apache.apex.malhar.sql.table.CSVMessageFormat;
+import org.apache.apex.malhar.sql.table.FileEndpoint;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+@ApplicationAnnotation(name = "SQLApplicationWithAPI")
+/**
+ * @since 3.6.0
+ */
+public class SQLApplicationWithAPI implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    // Source definition
+    String schemaInName = conf.get("csvSchemaInName");
+    String schemaIn = conf.get("csvSchemaIn");
+    String sourceFile = conf.get("sourceFile");
+
+    SQLExecEnvironment.getEnvironment()
+        .registerTable(schemaInName, new FileEndpoint(sourceFile, new CSVMessageFormat(schemaIn)))
+        .executeSQL(dag, conf.get("sql"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java
----------------------------------------------------------------------
diff --git a/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java b/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java
new file mode 100644
index 0000000..4c90a82
--- /dev/null
+++ b/examples/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.apex.malhar.sql.SQLExecEnvironment;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+@ApplicationAnnotation(name = "SQLApplicationWithModelFile")
+/**
+ * @since 3.6.0
+ */
+public class SQLApplicationWithModelFile implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    String modelFile = conf.get("modelFile");
+    String model;
+    try {
+      model = FileUtils.readFileToString(new File(modelFile));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    SQLExecEnvironment.getEnvironment()
+        .withModel(model)
+        .executeSQL(dag, conf.get("sql"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml
----------------------------------------------------------------------
diff --git a/examples/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml b/examples/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml
new file mode 100644
index 0000000..77852e7
--- /dev/null
+++ b/examples/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <!-- Kafka Operator Properties -->
+  <property>
+    <name>dt.operator.KafkaInput.prop.topics</name>
+    <value>dataTopic</value>
+  </property>
+  <property>
+    <name>dt.operator.KafkaInput.prop.clusters</name>
+    <value>localhost:9092</value>  <!-- broker (NOT zookeeper) address -->
+  </property>
+
+  <!-- CSV Parser Properties -->
+  <property>
+    <name>dt.operator.CSVParser.prop.schema</name>
+    <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value>
+  </property>
+
+  <!-- SQL Properties -->
+  <property>
+    <name>sqlSchemaInputName</name>
+    <value>FROMCSV</value>
+  </property>
+  <property>
+    <name>sqlSchemaOutputName</name>
+    <value>TOFILE</value>
+  </property>
+  <property>
+    <name>folderPath</name>
+    <value>/tmp/output</value>
+  </property>
+  <property>
+    <name>fileName</name>
+    <value>output.txt</value>
+  </property>
+  <property>
+    <name>sqlSchemaOutputDef</name>
+    <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime1","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"RowTime2","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"Product","type":"String"}]}</value>
+  </property>
+  <property>
+    <name>sql</name>
+    <value>INSERT INTO TOFILE SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) FROM FROMCSV WHERE ID > 3 AND PRODUCT LIKE 'paint%'</value>
+  </property>
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml
----------------------------------------------------------------------
diff --git a/examples/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml b/examples/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml
new file mode 100644
index 0000000..0d25aa6
--- /dev/null
+++ b/examples/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <!-- Input Definition -->
+  <property>
+    <name>schemaInName</name>
+    <value>ORDERS</value>
+  </property>
+  <property>
+    <name>schemaInDef</name>
+    <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value>
+  </property>
+  <property>
+    <name>broker</name>
+    <value>localhost:9090</value>
+  </property>
+  <property>
+    <name>topic</name>
+    <value>inputTopic</value>
+  </property>
+
+  <!-- Output Definition -->
+  <property>
+    <name>schemaOutName</name>
+    <value>SALES</value>
+  </property>
+  <property>
+    <name>schemaOutDef</name>
+    <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime1","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"RowTime2","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"Product","type":"String"}]}</value>
+  </property>
+  <property>
+    <name>outputFolder</name>
+    <value>/tmp/output</value>
+  </property>
+  <property>
+    <name>destFileName</name>
+    <value>out.file</value>
+  </property>
+
+  <!-- Execution SQL -->
+  <property>
+    <name>sql</name>
+    <value>INSERT INTO SALES SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) FROM ORDERS WHERE ID > 3 AND PRODUCT LIKE 'paint%'</value>
+  </property>
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml
----------------------------------------------------------------------
diff --git a/examples/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml b/examples/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml
new file mode 100644
index 0000000..9ac49d4
--- /dev/null
+++ b/examples/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <!-- Input Definition -->
+  <property>
+    <name>csvSchemaInName</name>
+    <value>ORDERS</value>
+  </property>
+  <property>
+    <name>csvSchemaIn</name>
+    <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value>
+  </property>
+  <property>
+    <name>sourceFile</name>
+    <value>src/test/resources/input.csv</value>
+  </property>
+
+  <!-- Execution SQL -->
+  <property>
+    <name>sql</name>
+    <value>SELECT STREAM ROWTIME, PRODUCT FROM ORDERS</value>
+  </property>
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml
----------------------------------------------------------------------
diff --git a/examples/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml b/examples/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml
new file mode 100644
index 0000000..ab026c2
--- /dev/null
+++ b/examples/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <property>
+    <name>modelFile</name>
+    <value>src/main/resources/model/model_file_csv.json</value>
+  </property>
+  <property>
+    <name>sql</name>
+    <value>SELECT STREAM ROWTIME, PRODUCT FROM ORDERS</value>
+  </property>
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/sql/src/main/resources/META-INF/properties.xml b/examples/sql/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..2702315
--- /dev/null
+++ b/examples/sql/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <!-- Memory settings for all examples -->
+  <property>
+    <name>dt.attr.MASTER_MEMORY_MB</name>
+    <value>512</value>
+  </property>
+  <property>
+    <name>dt.application.*.operator.*.attr.MEMORY_MB</name>
+    <value>256</value>
+  </property>
+  <property>
+    <name>dt.application.*.operator.*.attr.JVM_OPTIONS</name>
+    <value>-Xmx128M</value>
+  </property>
+  <property>
+    <name>dt.application.*.operator.*.port.*.attr.BUFFER_MEMORY_MB</name>
+    <value>128</value>
+  </property>
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/main/resources/model/model_file_csv.json
----------------------------------------------------------------------
diff --git a/examples/sql/src/main/resources/model/model_file_csv.json b/examples/sql/src/main/resources/model/model_file_csv.json
new file mode 100644
index 0000000..beba18d
--- /dev/null
+++ b/examples/sql/src/main/resources/model/model_file_csv.json
@@ -0,0 +1,27 @@
+{
+  "version": "1.0",
+  "defaultSchema": "APEX",
+  "schemas": [{
+    "name": "APEX",
+    "tables": [
+      {
+        "name": "ORDERS",
+        "type": "custom",
+        "factory": "org.apache.apex.malhar.sql.schema.ApexSQLTableFactory",
+        "stream": {
+        "stream": true
+        },
+        "operand": {
+          "endpoint": "file",
+          "messageFormat": "csv",
+          "endpointOperands": {
+            "directory": "src/test/resources/input.csv"
+          },
+          "messageFormatOperands": {
+            "schema": "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss\"}},{\"name\":\"id\",\"type\":\"Integer\"},{\"name\":\"Product\",\"type\":\"String\"},{\"name\":\"units\",\"type\":\"Integer\"}]}"
+          }
+        }
+      }
+    ]
+  }]
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java b/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java
new file mode 100644
index 0000000..7208701
--- /dev/null
+++ b/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import org.apache.apex.malhar.kafka.EmbeddedKafka;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+public class FusionStyleSQLApplicationTest
+{
+  private final String testTopicData = "dataTopic";
+  private final String testTopicResult = "resultTopic";
+
+  private TimeZone defaultTZ;
+  private EmbeddedKafka kafka;
+
+  private static String outputFolder = "target/output/";
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Before
+  public void setUp() throws Exception
+  {
+    defaultTZ = TimeZone.getDefault();
+    TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
+
+    kafka = new EmbeddedKafka();
+    kafka.start();
+    kafka.createTopic(testTopicData);
+    kafka.createTopic(testTopicResult);
+
+    outputFolder += testName.getMethodName() + "/";
+  }
+
+  @After
+  public void tearDown() throws Exception
+  {
+    kafka.stop();
+
+    TimeZone.setDefault(defaultTZ);
+  }
+
+  @Test
+  public void test() throws Exception
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-FusionStyleSQLApplication.xml"));
+
+      conf.set("dt.operator.KafkaInput.prop.topics", testTopicData);
+      conf.set("dt.operator.KafkaInput.prop.clusters", kafka.getBroker());
+      conf.set("folderPath", outputFolder);
+      conf.set("fileName", "out.tmp");
+
+      FusionStyleSQLApplication app = new FusionStyleSQLApplication();
+
+      lma.prepareDAG(app, conf);
+
+      LocalMode.Controller lc = lma.getController();
+
+      lc.runAsync();
+      kafka.publish(testTopicData, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11",
+          "15/02/2016 10:16:00 +0000,2,paint2,12",
+          "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14",
+          "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16"));
+
+      Assert.assertTrue(PureStyleSQLApplicationTest.waitTillFileIsPopulated(outputFolder, 40000));
+      lc.shutdown();
+
+      File file = new File(outputFolder);
+      File file1 = new File(outputFolder + file.list()[0]);
+      List<String> strings = FileUtils.readLines(file1);
+
+      String[] actualLines = strings.toArray(new String[strings.size()]);
+      String[] expectedLines = new String[] {
+          "15/02/2016 10:18:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT4",
+          "",
+          "15/02/2016 10:19:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT5",
+          ""};
+      Assert.assertEquals(expectedLines.length, actualLines.length);
+      for (int i = 0; i < actualLines.length; i++) {
+        Assert.assertEquals(expectedLines[i], actualLines[i]);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java b/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java
new file mode 100644
index 0000000..f298059
--- /dev/null
+++ b/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import org.apache.apex.malhar.kafka.EmbeddedKafka;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.LocalMode;
+
+
+public class PureStyleSQLApplicationTest
+{
+  private final String testTopicData = "dataTopic";
+  private final String testTopicResult = "resultTopic";
+
+  private TimeZone defaultTZ;
+  private EmbeddedKafka kafka;
+  private static String outputFolder = "target/output/";
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Before
+  public void setUp() throws Exception
+  {
+    defaultTZ = TimeZone.getDefault();
+    TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
+
+    kafka = new EmbeddedKafka();
+    kafka.start();
+    kafka.createTopic(testTopicData);
+    kafka.createTopic(testTopicResult);
+
+    outputFolder += testName.getMethodName() + "/";
+  }
+
+  @After
+  public void tearDown() throws Exception
+  {
+    kafka.stop();
+    TimeZone.setDefault(defaultTZ);
+  }
+
+  @Test
+  public void test() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-PureStyleSQLApplication.xml"));
+
+    conf.set("broker", kafka.getBroker());
+    conf.set("topic", testTopicData);
+    conf.set("outputFolder", outputFolder);
+    conf.set("destFileName", "out.tmp");
+
+    PureStyleSQLApplication app = new PureStyleSQLApplication();
+
+    lma.prepareDAG(app, conf);
+
+    LocalMode.Controller lc = lma.getController();
+
+    lc.runAsync();
+    kafka.publish(testTopicData, Arrays.asList(
+        "15/02/2016 10:15:00 +0000,1,paint1,11",
+        "15/02/2016 10:16:00 +0000,2,paint2,12",
+        "15/02/2016 10:17:00 +0000,3,paint3,13",
+        "15/02/2016 10:18:00 +0000,4,paint4,14",
+        "15/02/2016 10:19:00 +0000,5,paint5,15",
+        "15/02/2016 10:10:00 +0000,6,abcde6,16"));
+
+    Assert.assertTrue(waitTillFileIsPopulated(outputFolder, 40000));
+    lc.shutdown();
+
+    File file = new File(outputFolder);
+    File file1 = new File(outputFolder + file.list()[0]);
+    List<String> strings = FileUtils.readLines(file1);
+
+    String[] actualLines = strings.toArray(new String[strings.size()]);
+
+    String[] expectedLines = new String[]{
+        "15/02/2016 10:18:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT4",
+        "",
+        "15/02/2016 10:19:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT5",
+        ""};
+
+    Assert.assertEquals(expectedLines.length, actualLines.length);
+    for (int i = 0;i < expectedLines.length; i++) {
+      Assert.assertEquals(expectedLines[i], actualLines[i]);
+    }
+  }
+
+  public static boolean waitTillFileIsPopulated(String outputFolder, int timeout) throws IOException, InterruptedException
+  {
+    boolean result;
+    long now = System.currentTimeMillis();
+    Path outDir = new Path("file://" + new File(outputFolder).getAbsolutePath());
+    try (FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration())) {
+      List<String> strings = Lists.newArrayList();
+      while (System.currentTimeMillis() - now < timeout) {
+        if (fs.exists(outDir)) {
+          File file = new File(outputFolder);
+          if (file.list().length > 0) {
+            File file1 = new File(outputFolder + file.list()[0]);
+            strings = FileUtils.readLines(file1);
+            if (strings.size() != 0) {
+              break;
+            }
+          }
+        }
+
+        Thread.sleep(500);
+      }
+
+      result = fs.exists(outDir) && (strings.size() != 0);
+    }
+
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java
----------------------------------------------------------------------
diff --git a/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java b/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java
new file mode 100644
index 0000000..6b1a404
--- /dev/null
+++ b/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.TimeZone;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.Collections2;
+
+import com.datatorrent.api.LocalMode;
+
+
+public class SQLApplicationWithAPITest
+{
+  private TimeZone defaultTZ;
+
+  @Before
+  public void setUp() throws Exception
+  {
+    defaultTZ = TimeZone.getDefault();
+    TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
+  }
+
+  @After
+  public void tearDown() throws Exception
+  {
+    TimeZone.setDefault(defaultTZ);
+  }
+
+  @Test
+  public void test() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SQLApplicationWithAPI.xml"));
+
+    SQLApplicationWithAPI app = new SQLApplicationWithAPI();
+
+    lma.prepareDAG(app, conf);
+
+    LocalMode.Controller lc = lma.getController();
+
+    PrintStream originalSysout = System.out;
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    System.setOut(new PrintStream(baos));
+
+    lc.runAsync();
+    SQLApplicationWithModelFileTest.waitTillStdoutIsPopulated(baos, 30000);
+    lc.shutdown();
+
+    System.setOut(originalSysout);
+
+    String[] sout = baos.toString().split(System.lineSeparator());
+    Collection<String> filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:"));
+
+    String[] actualLines = filter.toArray(new String[filter.size()]);
+    Assert.assertTrue(actualLines[0].contains("RowTime=Mon Feb 15 10:15:00 GMT 2016, Product=paint1"));
+    Assert.assertTrue(actualLines[1].contains("RowTime=Mon Feb 15 10:16:00 GMT 2016, Product=paint2"));
+    Assert.assertTrue(actualLines[2].contains("RowTime=Mon Feb 15 10:17:00 GMT 2016, Product=paint3"));
+    Assert.assertTrue(actualLines[3].contains("RowTime=Mon Feb 15 10:18:00 GMT 2016, Product=paint4"));
+    Assert.assertTrue(actualLines[4].contains("RowTime=Mon Feb 15 10:19:00 GMT 2016, Product=paint5"));
+    Assert.assertTrue(actualLines[5].contains("RowTime=Mon Feb 15 10:10:00 GMT 2016, Product=abcde6"));
+  }
+}


Mime
View raw message