apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [18/30] apex-malhar git commit: Renamed demos to examples. Packages and artifactid names are changed as suggested.
Date Tue, 07 Mar 2017 06:58:23 GMT
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java
deleted file mode 100644
index 7bbb8ec..0000000
--- a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.sql.sample;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.TimeZone;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.base.Predicates;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
-
-import com.datatorrent.api.LocalMode;
-
-public class SQLApplicationWithModelFileTest
-{
-  private TimeZone defaultTZ;
-
-  @Before
-  public void setUp() throws Exception
-  {
-    defaultTZ = TimeZone.getDefault();
-    TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
-  }
-
-  @After
-  public void tearDown() throws Exception
-  {
-    TimeZone.setDefault(defaultTZ);
-  }
-
-  @Test
-  public void test() throws Exception
-  {
-    LocalMode lma = LocalMode.newInstance();
-    Configuration conf = new Configuration(false);
-    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
-    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SQLApplicationWithModelFile.xml"));
-
-    SQLApplicationWithModelFile app = new SQLApplicationWithModelFile();
-
-    lma.prepareDAG(app, conf);
-
-    LocalMode.Controller lc = lma.getController();
-
-    PrintStream originalSysout = System.out;
-    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    System.setOut(new PrintStream(baos));
-
-    lc.runAsync();
-    waitTillStdoutIsPopulated(baos, 30000);
-    lc.shutdown();
-
-    System.setOut(originalSysout);
-
-    String[] sout = baos.toString().split(System.lineSeparator());
-    Collection<String> filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:"));
-
-    String[] actualLines = filter.toArray(new String[filter.size()]);
-    Assert.assertTrue(actualLines[0].contains("RowTime=Mon Feb 15 10:15:00 GMT 2016, Product=paint1"));
-    Assert.assertTrue(actualLines[1].contains("RowTime=Mon Feb 15 10:16:00 GMT 2016, Product=paint2"));
-    Assert.assertTrue(actualLines[2].contains("RowTime=Mon Feb 15 10:17:00 GMT 2016, Product=paint3"));
-    Assert.assertTrue(actualLines[3].contains("RowTime=Mon Feb 15 10:18:00 GMT 2016, Product=paint4"));
-    Assert.assertTrue(actualLines[4].contains("RowTime=Mon Feb 15 10:19:00 GMT 2016, Product=paint5"));
-    Assert.assertTrue(actualLines[5].contains("RowTime=Mon Feb 15 10:10:00 GMT 2016, Product=abcde6"));
-  }
-
-  public static boolean waitTillStdoutIsPopulated(ByteArrayOutputStream baos, int timeout) throws InterruptedException,
-    IOException
-  {
-    long now = System.currentTimeMillis();
-    Collection<String> filter = Lists.newArrayList();
-    while (System.currentTimeMillis() - now < timeout) {
-      baos.flush();
-      String[] sout = baos.toString().split(System.lineSeparator());
-      filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:"));
-      if (filter.size() != 0) {
-        break;
-      }
-
-      Thread.sleep(500);
-    }
-
-    return (filter.size() != 0);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/test/resources/input.csv
----------------------------------------------------------------------
diff --git a/demos/sql/src/test/resources/input.csv b/demos/sql/src/test/resources/input.csv
deleted file mode 100644
index c4786d1..0000000
--- a/demos/sql/src/test/resources/input.csv
+++ /dev/null
@@ -1,6 +0,0 @@
-15/02/2016 10:15:00 +0000,1,paint1,11
-15/02/2016 10:16:00 +0000,2,paint2,12
-15/02/2016 10:17:00 +0000,3,paint3,13
-15/02/2016 10:18:00 +0000,4,paint4,14
-15/02/2016 10:19:00 +0000,5,paint5,15
-15/02/2016 10:10:00 +0000,6,abcde6,16

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/demos/sql/src/test/resources/log4j.properties b/demos/sql/src/test/resources/log4j.properties
deleted file mode 100644
index 8ea3cfe..0000000
--- a/demos/sql/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,50 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-log4j.rootLogger=DEBUG,CONSOLE
-
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.CONSOLE.threshold=WARN
-test.log.console.threshold=WARN
-
-log4j.appender.RFA=org.apache.log4j.RollingFileAppender
-log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
-log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.RFA.File=/tmp/app.log
-
-# to enable, add SYSLOG to rootLogger
-log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
-log4j.appender.SYSLOG.syslogHost=127.0.0.1
-log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
-log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
-log4j.appender.SYSLOG.Facility=LOCAL1
-
-log4j.logger.org=info
-#log4j.logger.org.apache.commons.beanutils=warn
-log4j.logger.com.datatorrent=INFO
-log4j.logger.org.apache.apex=INFO
-
-log4j.logger.org.apache.calcite=WARN
-log4j.logger.org.apache.kafka=WARN
-log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
-log4j.logger.org.apache.zookeeper=WARN
-log4j.logger.kafka=WARN
-log4j.logger.kafka.consumer=WARN

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/pom.xml
----------------------------------------------------------------------
diff --git a/demos/twitter/pom.xml b/demos/twitter/pom.xml
deleted file mode 100644
index 767d809..0000000
--- a/demos/twitter/pom.xml
+++ /dev/null
@@ -1,101 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <artifactId>twitter-demo</artifactId>
-  <packaging>jar</packaging>
-
-  <name>Apache Apex Malhar Twitter Demo</name>
-  <description>Twitter Rolling Top Words application demonstrates real-time computations over a sliding window.</description>
-
-  <parent>
-    <groupId>org.apache.apex</groupId>
-    <artifactId>malhar-demos</artifactId>
-    <version>3.7.0-SNAPSHOT</version>
-  </parent>
-
-  <properties>
-    <skipTests>true</skipTests>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <!-- required by twitter demo -->
-      <groupId>org.twitter4j</groupId>
-      <artifactId>twitter4j-core</artifactId>
-      <version>4.0.6</version>
-    </dependency>
-    <dependency>
-      <!-- required by twitter demo -->
-      <groupId>org.twitter4j</groupId>
-      <artifactId>twitter4j-stream</artifactId>
-      <version>4.0.6</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase</artifactId>
-      <version>0.94.20</version>
-      <type>jar</type>
-      <exclusions>
-        <exclusion>
-          <groupId>*</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>mysql</groupId>
-      <artifactId>mysql-connector-java</artifactId>
-      <version>5.1.22</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.apex</groupId>
-      <artifactId>malhar-contrib</artifactId>
-      <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>*</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>com.amazonaws</groupId>
-      <artifactId>aws-java-sdk-kinesis</artifactId>
-      <version>1.9.10</version>
-    </dependency>
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-core</artifactId>
-      <version>2.4.4</version>
-      <scope>runtime</scope>
-    </dependency>
-    <dependency>
-      <groupId>it.unimi.dsi</groupId>
-      <artifactId>fastutil</artifactId>
-      <version>6.6.4</version>
-    </dependency>
-  </dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/demos/twitter/src/assemble/appPackage.xml b/demos/twitter/src/assemble/appPackage.xml
deleted file mode 100644
index 4138cf2..0000000
--- a/demos/twitter/src/assemble/appPackage.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
-  <id>appPackage</id>
-  <formats>
-    <format>jar</format>
-  </formats>
-  <includeBaseDirectory>false</includeBaseDirectory>
-  <fileSets>
-    <fileSet>
-      <directory>${basedir}/target/</directory>
-      <outputDirectory>/app</outputDirectory>
-      <includes>
-        <include>${project.artifactId}-${project.version}.jar</include>
-      </includes>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/target/deps</directory>
-      <outputDirectory>/lib</outputDirectory>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/src/site/conf</directory>
-      <outputDirectory>/conf</outputDirectory>
-      <includes>
-        <include>*.xml</include>
-      </includes>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/src/main/resources/META-INF</directory>
-      <outputDirectory>/META-INF</outputDirectory>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/src/main/resources/app</directory>
-      <outputDirectory>/app</outputDirectory>
-    </fileSet>
-  </fileSets>
-
-</assembly>
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java
deleted file mode 100644
index b9d32ab..0000000
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.twitter;
-
-import java.net.URI;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.Operator.InputPort;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.kinesis.AbstractKinesisInputOperator;
-import com.datatorrent.contrib.kinesis.KinesisStringInputOperator;
-import com.datatorrent.contrib.kinesis.KinesisStringOutputOperator;
-import com.datatorrent.contrib.kinesis.ShardManager;
-import com.datatorrent.contrib.twitter.TwitterSampleInput;
-import com.datatorrent.lib.algo.UniqueCounter;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
-
-/**
- * Twitter Demo Application: <br>
- * This demo application samples random public status from twitter, send to Hashtag
- * extractor and extract the status and send it into kinesis <br>
- * Get the records from kinesis and converts into Hashtags. Top 10 Hashtag(s) mentioned in
- * tweets in last 5 mins are displayed on every window count (500ms).<br>
- * <br>
- *
- * Real Time Calculation :<br>
- * This application calculates top 10 Hashtag mentioned in tweets in last 5
- * minutes across a 1% random tweet sampling on a rolling window basis.<br>
- * <br>
- * Before running this application, you need to have a <a href="https://dev.twitter.com/apps">Twitter API account</a>,
- * <a href="https://http://aws.amazon.com/">AWS Account</a> and configure the authentication details.
- * For launch from CLI, those go into ~/.dt/dt-site.xml:
- * <pre>
- * {@code
- * <?xml version="1.0" encoding="UTF-8"?>
- * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
- * <configuration>
- *
- *   <property> <name>dt.operator.TweetSampler.prop.consumerKey</name>
- *   <value>TBD</value> </property>
- *
- *   <property> <name>dt.operator.TweetSampler.prop.consumerSecret</name>
- *   <value>TBD</value> </property>
- *
- *   <property> <name>dt.operator.TweetSampler.prop.accessToken</name>
- *   <value>TBD</value> </property>
- *
- *   <property> <name>dt.operator.TweetSampler.prop.accessTokenSecret</name>
- *   <value>TBD</value> </property>
- *
- *   <property> <name>dt.operator.FromKinesis.streamName</name>
- *   <value>TBD</value> </property>
- *
- *   <property> <name>dt.operator.FromKinesis.accessKey</name>
- *   <value>TBD</value> </property>
- *
- *   <property> <name>dt.operator.FromKinesis.secretKey</name>
- *   <value>TBD</value> </property>
- *
- *   <property> <name>dt.operator.ToKinesis.streamName</name>
- *   <value>TBD</value> </property>
- *
- *   <property> <name>dt.operator.ToKinesis.accessKey</name>
- *   <value>TBD</value> </property>
- *
- *   <property> <name>dt.operator.ToKinesis.secretKey</name>
- *   <value>TBD</value> </property>
- *
- * </configuration>
- * }
- * </pre>
- * Custom Attributes: <br>
- * <b>topCounts operator : <b>
- * <ul>
- * <li>Top Count : 10, number of top unique Hashtag to be reported.</li>
- * <li>Sliding window count : 600, report over last 5 min (600 * .5 / 60 mins)</li>
- * <li>window slide value : 1</li>
- * </ul>
- * <p>
- * Running Java Test or Main app in IDE:
- *
- * <pre>
- * LocalMode.runApp(new Application(), 600000); // 10 min run
- * </pre>
- *
- * Run Success : <br>
- * For successful deployment and run, user should see similar output on console as below:
- *
- * <pre>
- * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"مع_الله\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
- * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"مع_الله\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
- * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"مع_الله\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
- * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"مع_الله\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
- * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"مع_الله\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
- * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"مع_الله\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
- * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"مع_الله\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
- * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"مع_الله\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
- * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"مع_الله\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
- * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"مع_الله\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
- * 2013-06-17 14:38:55,201 [main] INFO  stram.StramLocalCluster run - Application finished.
- * 2013-06-17 14:38:55,201 [container-2] INFO  stram.StramChild processHeartbeatResponse - Received shutdown request
- * </pre>
- *
- * Scaling Options : <br>
- * User can scale application by setting intial partition size > 1 on count
- * unique operator. <br>
- * <br>
- *
- * Application DAG : <br>
- * <img src="doc-files/Application.gif" width=600px > <br>
- * <br>
- *
- * Streaming Window Size : 500ms(default) <br>
- * Operator Details : <br>
- * <ul>
- * <li><b>The twitterFeed operator : </b> This operator samples random public
- * statues from twitter and emits to application. <br>
- * Class : com.datatorrent.demos.twitter.TwitterSampleInput <br>
- * StateFull : No, window count 1 <br>
- * </li>
- * <li><b>The HashtagExtractor operator : </b> This operator extracts Hashtag from
- * random sampled statues from twitter. <br>
- * Class : {@link com.datatorrent.demos.twitter.TwitterStatusHashtagExtractor} <br>
- * StateFull : No, window count 1 <br>
- * </li>
- * <li><b>The outputOp operator : </b> This operator sent the tags into the kinesis. <br>
- * Class : {@link com.datatorrent.contrib.kinesis.KinesisStringOutputOperator} <br>
- * </li>
- * <li><b>The inputOp operator : </b> This operator fetches the records from kinesis and
- * converts into hastags and emits them. <br>
- * Class : {@link com.datatorrent.contrib.kinesis.KinesisStringOutputOperator} <br>
- * </li>
- * <li><b>The uniqueCounter operator : </b> This operator aggregates count for each
- * Hashtag extracted from random samples. <br>
- * Class : {@link com.datatorrent.lib.algo.UniqueCounter} <br>
- * StateFull : No, window count 1 <br>
- * </li>
- * <li><b> The topCounts operator : </b> This operator caluculates top Hashtag in last 1
- * min sliding window count 1. <br>
- * Class : com.datatorrent.lib.algo.WindowedTopCounter <br>
- * StateFull : Yes, sliding window count 120 (1 min) <br>
- * </li>
- * <li><b>The operator Console: </b> This operator just outputs the input tuples
- * to the console (or stdout). <br>
- * </li>
- * </ul>
- *
- * @since 2.0.0
- */
-@ApplicationAnnotation(name = "TwitterKinesisDemo")
-public class KinesisHashtagsApplication implements StreamingApplication
-{
-  private final Locality locality = null;
-
-  private InputPort<Object> consoleOutput(DAG dag, String operatorName)
-  {
-    String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-    if (!StringUtils.isEmpty(gatewayAddress)) {
-      URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
-      String topic = "demos.twitter." + operatorName;
-      //LOG.info("WebSocket with gateway at: {}", gatewayAddress);
-      PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator(operatorName, new PubSubWebSocketOutputOperator<Object>());
-      wsOut.setUri(uri);
-      wsOut.setTopic(topic);
-      return wsOut.input;
-    }
-    ConsoleOutputOperator operator = dag.addOperator(operatorName, new ConsoleOutputOperator());
-    operator.setStringFormat(operatorName + ": %s");
-    return operator.input;
-  }
-
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    // Setup the operator to get the data from twitter sample stream injected into the system.
-    TwitterSampleInput twitterFeed = new TwitterSampleInput();
-    twitterFeed = dag.addOperator("TweetSampler", twitterFeed);
-
-    //  Setup the operator to get the Hashtags extracted from the twitter statuses
-    TwitterStatusHashtagExtractor HashtagExtractor = dag.addOperator("HashtagExtractor", TwitterStatusHashtagExtractor.class);
-
-    //Setup the operator send the twitter statuses to kinesis
-    KinesisStringOutputOperator outputOp = dag.addOperator("ToKinesis", new KinesisStringOutputOperator());
-    outputOp.setBatchSize(500);
-
-    // Feed the statuses from feed into the input of the Hashtag extractor.
-    dag.addStream("TweetStream", twitterFeed.status, HashtagExtractor.input).setLocality(Locality.CONTAINER_LOCAL);
-    //  Start counting the Hashtags coming out of Hashtag extractor
-    dag.addStream("SendToKinesis", HashtagExtractor.hashtags, outputOp.inputPort).setLocality(locality);
-
-    //------------------------------------------------------------------------------------------
-
-    KinesisStringInputOperator inputOp = dag.addOperator("FromKinesis", new KinesisStringInputOperator());
-    ShardManager shardStats = new ShardManager();
-    inputOp.setShardManager(shardStats);
-    inputOp.getConsumer().setRecordsLimit(600);
-    inputOp.setStrategy(AbstractKinesisInputOperator.PartitionStrategy.MANY_TO_ONE.toString());
-
-    // Setup a node to count the unique Hashtags within a window.
-    UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueHashtagCounter", new UniqueCounter<String>());
-
-    // Get the aggregated Hashtag counts and count them over last 5 mins.
-    WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>());
-    topCounts.setTopCount(10);
-    topCounts.setSlidingWindowWidth(600);
-    topCounts.setDagWindowWidth(1);
-
-    dag.addStream("TwittedHashtags", inputOp.outputPort, uniqueCounter.data).setLocality(locality);
-
-    // Count unique Hashtags
-    dag.addStream("UniqueHashtagCounts", uniqueCounter.count, topCounts.input).setLocality(locality);
-    // Count top 10
-    dag.addStream("TopHashtags", topCounts.output, consoleOutput(dag, "topHashtags")).setLocality(locality);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java
deleted file mode 100644
index 8b9f447..0000000
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.twitter;
-
-import java.io.Serializable;
-
-/**
- * Developed for a demo<br>
- *
- * @param <T> Type of object for which sliding window is being maintained.
- * @since 0.3.2
- */
-public class SlidingContainer<T> implements Serializable
-{
-  private static final long serialVersionUID = 201305291751L;
-  T identifier;
-  int totalCount;
-  int position;
-  int[] windowedCount;
-
-  @SuppressWarnings("unused")
-  private SlidingContainer()
-  {
-    /* needed for Kryo serialization */
-  }
-
-  public SlidingContainer(T identifier, int windowCount)
-  {
-    this.identifier = identifier;
-    this.totalCount = 0;
-    this.position = 0;
-    windowedCount = new int[windowCount];
-  }
-
-  public void adjustCount(int i)
-  {
-    windowedCount[position] += i;
-  }
-
-  public void slide()
-  {
-    int currentCount = windowedCount[position];
-    position = position == windowedCount.length - 1 ? 0 : position + 1;
-    totalCount += currentCount - windowedCount[position];
-    windowedCount[position] = 0;
-  }
-
-  @Override
-  public String toString()
-  {
-    return identifier + " => " + totalCount;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java
deleted file mode 100644
index 9edce64..0000000
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.twitter;
-
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import javax.annotation.Nonnull;
-
-import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
-import com.datatorrent.contrib.twitter.TwitterSampleInput;
-import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
-
-import twitter4j.Status;
-
-/**
- * An application which connects to Twitter Sample Input and stores all the
- * tweets with their usernames in a mysql database. Please review the docs
- * for TwitterTopCounterApplication to setup your twitter credentials. You
- * may also be able to change JDBCStore credentials using config file.
- *
- * You will also need to create appropriate database and tables with the
- * following schema, also included in mysql.sql in resources:
- * <pre>
- * DROP TABLE if exists tweets;
- * CREATE TABLE tweets (
- * window_id LONG NOT NULL,
- * creation_date DATE,
- * text VARCHAR(256) NOT NULL,
- * userid VARCHAR(40) NOT NULL,
- * KEY ( userid, creation_date)
- * );
- *
- * drop table if exists dt_window_id_tracker;
- * CREATE TABLE dt_window_id_tracker (
- * dt_application_id VARCHAR(100) NOT NULL,
- * dt_operator_id int(11) NOT NULL,
- * dt_window_id bigint NOT NULL,
- * UNIQUE (dt_application_id, dt_operator_id, dt_window_id)
- * )  ENGINE=MyISAM DEFAULT CHARSET=latin1;
- * </pre>
- *
- * @since 0.9.4
- */
-@ApplicationAnnotation(name = "TwitterDumpDemo")
-public class TwitterDumpApplication implements StreamingApplication
-{
-  public static class Status2Database extends AbstractJdbcTransactionableOutputOperator<Status>
-  {
-    public static final String INSERT_STATUS_STATEMENT = "insert into tweets (window_id, creation_date, text, userid) values (?, ?, ?, ?)";
-
-    public Status2Database()
-    {
-      store.setMetaTable("dt_window_id_tracker");
-      store.setMetaTableAppIdColumn("dt_application_id");
-      store.setMetaTableOperatorIdColumn("dt_operator_id");
-      store.setMetaTableWindowColumn("dt_window_id");
-    }
-
-    @Nonnull
-    @Override
-    protected String getUpdateCommand()
-    {
-      return INSERT_STATUS_STATEMENT;
-    }
-
-    @Override
-    protected void setStatementParameters(PreparedStatement statement, Status tuple) throws SQLException
-    {
-      statement.setLong(1, currentWindowId);
-
-      statement.setDate(2, new java.sql.Date(tuple.getCreatedAt().getTime()));
-      statement.setString(3, tuple.getText());
-      statement.setString(4, tuple.getUser().getScreenName());
-      statement.addBatch();
-    }
-  }
-
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    //dag.setAttribute(DAGContext.APPLICATION_NAME, "TweetsDump");
-
-    TwitterSampleInput twitterStream = dag.addOperator("TweetSampler", new TwitterSampleInput());
-
-    //ConsoleOutputOperator dbWriter = dag.addOperator("DatabaseWriter", new ConsoleOutputOperator());
-
-    Status2Database dbWriter = dag.addOperator("DatabaseWriter", new Status2Database());
-    dbWriter.getStore().setDatabaseDriver("com.mysql.jdbc.Driver");
-    dbWriter.getStore().setDatabaseUrl("jdbc:mysql://node6.morado.com:3306/twitter");
-    dbWriter.getStore().setConnectionProperties("user:twitter");
-
-    dag.addStream("Statuses", twitterStream.status, dbWriter.input).setLocality(Locality.CONTAINER_LOCAL);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java
deleted file mode 100644
index 3adbbe0..0000000
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.twitter;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Put;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
-import com.datatorrent.contrib.hbase.AbstractHBasePutOutputOperator;
-import com.datatorrent.contrib.twitter.TwitterSampleInput;
-
-import twitter4j.Status;
-
-/**
- * An application which connects to Twitter Sample Input and stores all the
- * tweets with their usernames in a hbase database. Please review the docs
- * for TwitterTopCounterApplication to setup your twitter credentials.
- *
- * You need to create the HBase table to run this demo. Table name can be
- * configured but columnfamily must be 'cf' to make this demo simple and complied
- * with the mysql based demo.
- * create 'tablename', 'cf'
- *
- * </pre>
- *
- * @since 1.0.2
- */
-@ApplicationAnnotation(name = "TwitterDumpHBaseDemo")
-public class TwitterDumpHBaseApplication implements StreamingApplication
-{
-
-  public static class Status2Hbase extends AbstractHBasePutOutputOperator<Status>
-  {
-
-    @Override
-    public Put operationPut(Status t)
-    {
-      Put put = new Put(ByteBuffer.allocate(8).putLong(t.getCreatedAt().getTime()).array());
-      put.add("cf".getBytes(), "text".getBytes(), t.getText().getBytes());
-      put.add("cf".getBytes(), "userid".getBytes(), t.getText().getBytes());
-      return put;
-    }
-
-  }
-
-
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    //dag.setAttribute(DAGContext.APPLICATION_NAME, "TweetsDump");
-
-    TwitterSampleInput twitterStream = dag.addOperator("TweetSampler", new TwitterSampleInput());
-
-    Status2Hbase hBaseWriter = dag.addOperator("DatabaseWriter", new Status2Hbase());
-
-    dag.addStream("Statuses", twitterStream.status, hBaseWriter.input).setLocality(Locality.CONTAINER_LOCAL);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java
deleted file mode 100644
index d22db40..0000000
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.twitter;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.common.util.BaseOperator;
-
-import twitter4j.HashtagEntity;
-import twitter4j.Status;
-
-/**
- * <p>TwitterStatusHashtagExtractor class.</p>
- *
- * @since 1.0.2
- */
-public class TwitterStatusHashtagExtractor extends BaseOperator
-{
-  public final transient DefaultOutputPort<String> hashtags = new DefaultOutputPort<String>();
-  public final transient DefaultInputPort<Status> input = new DefaultInputPort<Status>()
-  {
-    @Override
-    public void process(Status status)
-    {
-      HashtagEntity[] entities = status.getHashtagEntities();
-      if (entities != null) {
-        for (HashtagEntity he : entities) {
-          if (he != null) {
-            hashtags.emit(he.getText());
-          }
-        }
-      }
-    }
-
-  };
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java
deleted file mode 100644
index 6dbc436..0000000
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.twitter;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.common.util.BaseOperator;
-
-import twitter4j.Status;
-import twitter4j.URLEntity;
-
-/**
- * <p>TwitterStatusURLExtractor class.</p>
- *
- * @since 0.3.2
- */
-public class TwitterStatusURLExtractor extends BaseOperator
-{
-  public final transient DefaultOutputPort<String> url = new DefaultOutputPort<String>();
-  public final transient DefaultInputPort<Status> input = new DefaultInputPort<Status>()
-  {
-    @Override
-    public void process(Status status)
-    {
-      URLEntity[] entities = status.getURLEntities();
-      if (entities != null) {
-        for (URLEntity ue: entities) {
-          if (ue != null) { // see why we intermittently get NPEs
-            url.emit((ue.getExpandedURL() == null ? ue.getURL() : ue.getExpandedURL()).toString());
-          }
-        }
-      }
-    }
-  };
-
-  private static final Logger LOG = LoggerFactory.getLogger(TwitterStatusURLExtractor.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java
deleted file mode 100644
index e05a37a..0000000
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.twitter;
-
-import java.util.Arrays;
-import java.util.HashSet;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.common.util.BaseOperator;
-
-/**
- * <p>TwitterStatusWordExtractor class.</p>
- *
- * @since 0.3.2
- */
-public class TwitterStatusWordExtractor extends BaseOperator
-{
-  public HashSet<String> filterList;
-
-  public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
-  public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
-  {
-    @Override
-    public void process(String text)
-    {
-      String[] strs = text.split(" ");
-      if (strs != null) {
-        for (String str : strs) {
-          if (str != null && !filterList.contains(str) ) {
-            output.emit(str);
-          }
-        }
-      }
-    }
-  };
-
-  @Override
-  public void setup(OperatorContext context)
-  {
-    this.filterList = new HashSet<String>(Arrays.asList(new String[]{"", " ","I","you","the","a","to","as","he","him","his","her","she","me","can","for","of","and","or","but",
-      "this","that","!",",",".",":","#","/","@","be","in","out","was","were","is","am","are","so","no","...","my","de","RT","on","que","la","i","your","it","have","with","?","when",
-      "up","just","do","at","&","-","+","*","\\","y","n","like","se","en","te","el","I'm"}));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
deleted file mode 100644
index 731a38f..0000000
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.twitter;
-
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.Maps;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.Operator.OutputPort;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
-import com.datatorrent.contrib.twitter.TwitterSampleInput;
-import com.datatorrent.lib.algo.UniqueCounter;
-import com.datatorrent.lib.appdata.schemas.SchemaUtils;
-import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery;
-import com.datatorrent.lib.io.PubSubWebSocketAppDataResult;
-
-/**
- * Twitter Demo Application: <br>
- * This demo application samples random public status from twitter, send to url
- * extractor. <br>
- * Top 10 url(s) mentioned in tweets in last 5 mins are displayed on every
- * window count (500ms).<br>
- * <br>
- *
- * Real Time Calculation :<br>
- * This application calculates top 10 url mentioned in tweets in last 5
- * minutes across a 1% random tweet sampling on a rolling window basis.<br>
- * <br>
- * Before running this application, you need to have a <a href="https://dev.twitter.com/apps">Twitter API account</a>
- * and configure the authentication. For launch from CLI, those go into ~/.dt/dt-site.xml:
- * <pre>
- * {@code
- * <?xml version="1.0" encoding="UTF-8"?>
- * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
- * <configuration>
- *
- *   <property> <name>dt.operator.TweetSampler.consumerKey</name>
- *   <value>TBD</value> </property>
- *
- *   <property> <name>dt.operator.TweetSampler.consumerSecret</name>
- *   <value>TBD</value> </property>
- *
- *   <property> <name>dt.operator.TweetSampler.accessToken</name>
- *   <value>TBD</value> </property>
- *
- *   <property> <name>dt.operator.TweetSampler.accessTokenSecret</name>
- *   <value>TBD</value> </property>
- * </configuration>
- * }
- * </pre>
- * Custom Attributes: <br>
- * <b>topCounts operator : <b>
- * <ul>
- * <li>Top Count : 10, number of top unique url to be reported.</li>
- * <li>Sliding window count : 600, report over last 5 min (600 * .5 / 60 mins)</li>
- * <li>window slide value : 1</li>
- * </ul>
- * <p>
- * Running Java Test or Main app in IDE:
- *
- * <pre>
- * LocalMode.runApp(new Application(), 600000); // 10 min run
- * </pre>
- *
- * Run Success : <br>
- * For successful deployment and run, user should see following output on
- * console:
- *
- * <pre>
- * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
- * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
- * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
- * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
- * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
- * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
- * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
- * 2013-06-17 14:38:55,201 [main] INFO  stram.StramLocalCluster run - Application finished.
- * 2013-06-17 14:38:55,201 [container-2] INFO  stram.StramChild processHeartbeatResponse - Received shutdown request
- * </pre>
- *
- * Scaling Options : <br>
- * User can scale application by setting intial partition size > 1 on count
- * unique operator. <br>
- * <br>
- *
- * Application DAG : <br>
- * <img src="doc-files/Application.gif" width=600px > <br>
- * <br>
- *
- * Streaming Window Size : 500ms(default) <br>
- * Operator Details : <br>
- * <ul>
- * <li><b>The twitterFeed operator : </b> This operator samples random public
- * statues from twitter and emits to application. <br>
- * Class : com.datatorrent.demos.twitter.TwitterSampleInput <br>
- * StateFull : No, window count 1 <br>
- * </li>
- * <li><b>The urlExtractor operator : </b> This operator extracts url from
- * random sampled statues from twitter. <br>
- * Class : {@link com.datatorrent.demos.twitter.TwitterStatusURLExtractor} <br>
- * StateFull : No, window count 1 <br>
- * </li>
- * <li><b>The uniqueCounter operator : </b> This operator aggregates count for each
- * url extracted from random samples. <br>
- * Class : {@link com.datatorrent.lib.algo.UniqueCounter} <br>
- * StateFull : No, window count 1 <br>
- * </li>
- * <li><b> The topCounts operator : </b> This operator caluculates top url in last 1
- * min sliding window count 1. <br>
- * Class : com.datatorrent.lib.algo.WindowedTopCounter <br>
- * StateFull : Yes, sliding window count 120 (1 min) <br>
- * </li>
- * <li><b>The operator Console: </b> This operator just outputs the input tuples
- * to the console (or stdout). <br>
- * </li>
- * </ul>
- *
- * @since 0.3.2
- */
-@ApplicationAnnotation(name = TwitterTopCounterApplication.APP_NAME)
-public class TwitterTopCounterApplication implements StreamingApplication
-{
-  public static final String SNAPSHOT_SCHEMA = "twitterURLDataSchema.json";
-  public static final String CONVERSION_SCHEMA = "twitterURLConverterSchema.json";
-  public static final String APP_NAME = "TwitterDemo";
-
-  private final Locality locality = null;
-
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    // Setup the operator to get the data from twitter sample stream injected into the system.
-    TwitterSampleInput twitterFeed = new TwitterSampleInput();
-    twitterFeed = dag.addOperator("TweetSampler", twitterFeed);
-
-    //  Setup the operator to get the URLs extracted from the twitter statuses
-    TwitterStatusURLExtractor urlExtractor = dag.addOperator("URLExtractor", TwitterStatusURLExtractor.class);
-
-    // Setup a node to count the unique urls within a window.
-    UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueURLCounter", new UniqueCounter<String>());
-    // Get the aggregated url counts and count them over last 5 mins.
-    dag.setAttribute(uniqueCounter, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 600);
-    dag.setAttribute(uniqueCounter, Context.OperatorContext.SLIDE_BY_WINDOW_COUNT, 1);
-
-
-    WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>());
-    topCounts.setTopCount(10);
-    topCounts.setSlidingWindowWidth(1);
-    topCounts.setDagWindowWidth(1);
-
-    // Feed the statuses from feed into the input of the url extractor.
-    dag.addStream("TweetStream", twitterFeed.status, urlExtractor.input).setLocality(Locality.CONTAINER_LOCAL);
-    //  Start counting the urls coming out of URL extractor
-    dag.addStream("TwittedURLs", urlExtractor.url, uniqueCounter.data).setLocality(locality);
-    // Count unique urls
-    dag.addStream("UniqueURLCounts", uniqueCounter.count, topCounts.input);
-
-    consoleOutput(dag, "topURLs", topCounts.output, SNAPSHOT_SCHEMA, "url");
-  }
-
-  public static void consoleOutput(DAG dag, String operatorName, OutputPort<List<Map<String, Object>>> topCount, String schemaFile, String alias)
-  {
-    String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-    if (!StringUtils.isEmpty(gatewayAddress)) {
-      URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
-
-      AppDataSnapshotServerMap snapshotServer = dag.addOperator("SnapshotServer", new AppDataSnapshotServerMap());
-
-      Map<String, String> conversionMap = Maps.newHashMap();
-      conversionMap.put(alias, WindowedTopCounter.FIELD_TYPE);
-      String snapshotServerJSON = SchemaUtils.jarResourceFileToString(schemaFile);
-
-      snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON);
-      snapshotServer.setTableFieldToMapField(conversionMap);
-
-      PubSubWebSocketAppDataQuery wsQuery = new PubSubWebSocketAppDataQuery();
-      wsQuery.setUri(uri);
-      snapshotServer.setEmbeddableQueryInfoProvider(wsQuery);
-
-      PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult());
-      wsResult.setUri(uri);
-      Operator.InputPort<String> queryResultPort = wsResult.input;
-
-      dag.addStream("MapProvider", topCount, snapshotServer.input);
-      dag.addStream("Result", snapshotServer.queryResult, queryResultPort);
-    } else {
-      ConsoleOutputOperator operator = dag.addOperator(operatorName, new ConsoleOutputOperator());
-      operator.setStringFormat(operatorName + ": %s");
-
-      dag.addStream("MapProvider", topCount, operator.input);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java
deleted file mode 100644
index 3953ab7..0000000
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.twitter;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.twitter.TwitterSampleInput;
-import com.datatorrent.lib.algo.UniqueCounter;
-
-/**
- * This application is same as other twitter demo
- * {@link com.datatorrent.demos.twitter.TwitterTopCounterApplication} <br>
- * Run Sample :
- *
- * <pre>
- * 2013-06-17 16:50:34,911 [Twitter Stream consumer-1[Establishing connection]] INFO  twitter4j.TwitterStreamImpl info - Connection established.
- * 2013-06-17 16:50:34,912 [Twitter Stream consumer-1[Establishing connection]] INFO  twitter4j.TwitterStreamImpl info - Receiving status stream.
- * topWords: {}
- * topWords: {love=1, ate=1, catch=1, calma=1, Phillies=1, ela=1, from=1, running=1}
- * </pre>
- *
- * @since 0.3.2
- */
-@ApplicationAnnotation(name = TwitterTopWordsApplication.APP_NAME)
-public class TwitterTopWordsApplication implements StreamingApplication
-{
-  public static final String SNAPSHOT_SCHEMA = "twitterWordDataSchema.json";
-  public static final String CONVERSION_SCHEMA = "twitterWordConverterSchema.json";
-  public static final String APP_NAME = "RollingTopWordsDemo";
-  public static final String PROP_USE_APPDATA = "dt.application." + APP_NAME + ".useAppData";
-
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    TwitterSampleInput twitterFeed = new TwitterSampleInput();
-    twitterFeed = dag.addOperator("TweetSampler", twitterFeed);
-
-    TwitterStatusWordExtractor wordExtractor = dag.addOperator("WordExtractor", TwitterStatusWordExtractor.class);
-    UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueWordCounter", new UniqueCounter<String>());
-    WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>());
-
-    topCounts.setSlidingWindowWidth(120);
-    topCounts.setDagWindowWidth(1);
-
-    dag.addStream("TweetStream", twitterFeed.text, wordExtractor.input);
-    dag.addStream("TwittedWords", wordExtractor.output, uniqueCounter.data);
-    dag.addStream("UniqueWordCounts", uniqueCounter.count, topCounts.input).setLocality(Locality.CONTAINER_LOCAL);
-
-    TwitterTopCounterApplication.consoleOutput(dag, "topWords", topCounts.output, SNAPSHOT_SCHEMA, "word");
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java
deleted file mode 100644
index 3597a92..0000000
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.twitter;
-
-import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.twitter.TwitterSampleInput;
-import com.datatorrent.lib.algo.UniqueCounter;
-
-/**
- * Twitter Demo Application: <br>
- * This demo application samples random public status from twitter, send to Hashtag
- * extractor. <br>
- * Top 10 Hashtag(s) mentioned in tweets in last 5 mins are displayed on every
- * window count (500ms).<br>
- * <br>
- *
- * Real Time Calculation :<br>
- * This application calculates top 10 Hashtag mentioned in tweets in last 5
- * minutes across a 1% random tweet sampling on a rolling window basis.<br>
- * <br>
- * Before running this application, you need to have a <a href="https://dev.twitter.com/apps">Twitter API account</a>
- * and configure the authentication. For launch from CLI, those go into ~/.dt/dt-site.xml:
- * <pre>
- * {@code
- * <?xml version="1.0" encoding="UTF-8"?>
- * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
- * <configuration>
- *
- *   <property> <name>dt.operator.TweetSampler.prop.consumerKey</name>
- *   <value>TBD</value> </property>
- *
- *   <property> <name>dt.operator.TweetSampler.prop.consumerSecret</name>
- *   <value>TBD</value> </property>
- *
- *   <property> <name>dt.operator.TweetSampler.prop.accessToken</name>
- *   <value>TBD</value> </property>
- *
- *   <property> <name>dt.operator.TweetSampler.prop.accessTokenSecret</name>
- *   <value>TBD</value> </property>
- * </configuration>
- * }
- * </pre>
- * Custom Attributes: <br>
- * <b>topCounts operator : <b>
- * <ul>
- * <li>Top Count : 10, number of top unique Hashtag to be reported.</li>
- * <li>Sliding window count : 600, report over last 5 min (600 * .5 / 60 mins)</li>
- * <li>window slide value : 1</li>
- * </ul>
- * <p>
- * Running Java Test or Main app in IDE:
- *
- * <pre>
- * LocalMode.runApp(new Application(), 600000); // 10 min run
- * </pre>
- *
- * Run Success : <br>
- * For successful deployment and run, user should see similar output on console as below:
- *
- * <pre>
- * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"مع_الله\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
- * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"مع_الله\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
- * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"مع_الله\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
- * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"مع_الله\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
- * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"مع_الله\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
- * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"مع_الله\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
- * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"مع_الله\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
- * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"مع_الله\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
- * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"مع_الله\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
- * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"مع_الله\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
- * 2013-06-17 14:38:55,201 [main] INFO  stram.StramLocalCluster run - Application finished.
- * 2013-06-17 14:38:55,201 [container-2] INFO  stram.StramChild processHeartbeatResponse - Received shutdown request
- * </pre>
- *
- * Scaling Options : <br>
- * User can scale application by setting intial partition size > 1 on count
- * unique operator. <br>
- * <br>
- *
- * Application DAG : <br>
- * <img src="doc-files/Application.gif" width=600px > <br>
- * <br>
- *
- * Streaming Window Size : 500ms(default) <br>
- * Operator Details : <br>
- * <ul>
- * <li><b>The twitterFeed operator : </b> This operator samples random public
- * statues from twitter and emits to application. <br>
- * Class : com.datatorrent.demos.twitter.TwitterSampleInput <br>
- * StateFull : No, window count 1 <br>
- * </li>
- * <li><b>The HashtagExtractor operator : </b> This operator extracts Hashtag from
- * random sampled statues from twitter. <br>
- * Class : {@link com.datatorrent.demos.twitter.TwitterStatusHashtagExtractor} <br>
- * StateFull : No, window count 1 <br>
- * </li>
- * <li><b>The uniqueCounter operator : </b> This operator aggregates count for each
- * Hashtag extracted from random samples. <br>
- * Class : {@link com.datatorrent.lib.algo.UniqueCounter} <br>
- * StateFull : No, window count 1 <br>
- * </li>
- * <li><b> The topCounts operator : </b> This operator caluculates top Hashtag in last 1
- * min sliding window count 1. <br>
- * Class : com.datatorrent.lib.algo.WindowedTopCounter <br>
- * StateFull : Yes, sliding window count 120 (1 min) <br>
- * </li>
- * <li><b>The operator Console: </b> This operator just outputs the input tuples
- * to the console (or stdout). <br>
- * </li>
- * </ul>
- *
- * @since 1.0.2
- */
-@ApplicationAnnotation(name = TwitterTrendingHashtagsApplication.APP_NAME)
-public class TwitterTrendingHashtagsApplication implements StreamingApplication
-{
-  public static final String SNAPSHOT_SCHEMA = "twitterHashTagDataSchema.json";
-  public static final String CONVERSION_SCHEMA = "twitterHashTagConverterSchema.json";
-  public static final String APP_NAME = "TwitterTrendingDemo";
-  public static final String PROP_USE_APPDATA = "dt.application." + APP_NAME + ".useAppData";
-
-  private final Locality locality = null;
-
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    // Setup the operator to get the data from twitter sample stream injected into the system.
-    TwitterSampleInput twitterFeed = new TwitterSampleInput();
-    twitterFeed = dag.addOperator("TweetSampler", twitterFeed);
-
-    // Setup a node to count the unique Hashtags within a window.
-    UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueHashtagCounter", new UniqueCounter<String>());
-
-    // Get the aggregated Hashtag counts and count them over last 5 mins.
-    WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>());
-    topCounts.setTopCount(10);
-    topCounts.setSlidingWindowWidth(600);
-    topCounts.setDagWindowWidth(1);
-
-    dag.addStream("TwittedHashtags", twitterFeed.hashtag, uniqueCounter.data).setLocality(locality);
-    // Count unique Hashtags
-    dag.addStream("UniqueHashtagCounts", uniqueCounter.count, topCounts.input);
-
-    TwitterTopCounterApplication.consoleOutput(dag, "topHashtags", topCounts.output, SNAPSHOT_SCHEMA, "hashtag");
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java
deleted file mode 100644
index 43ed8f7..0000000
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.twitter;
-
-import java.nio.ByteBuffer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * <p>URLSerDe class.</p>
- *
- * @since 0.3.2
- */
-public class URLSerDe implements StreamCodec<byte[]>
-{
-  /**
-   * Covert the bytes into object useful for downstream node.
-   *
-   * @param fragment
-   * @return WindowedURLHolder object which represents the bytes.
-   */
-  @Override
-  public byte[] fromByteArray(Slice fragment)
-  {
-    if (fragment == null || fragment.buffer == null) {
-      return null;
-    } else if (fragment.offset == 0 && fragment.length == fragment.buffer.length) {
-      return fragment.buffer;
-    } else {
-      byte[] buffer = new byte[fragment.buffer.length];
-      System.arraycopy(fragment.buffer, fragment.offset, buffer, 0, fragment.length);
-      return buffer;
-    }
-  }
-
-  /**
-   * Cast the input object to byte[].
-   *
-   * @param object - byte array representing the bytes of the string
-   * @return the same object as input
-   */
-  @Override
-  public Slice toByteArray(byte[] object)
-  {
-    return new Slice(object, 0, object.length);
-  }
-
-  @Override
-  public int getPartition(byte[] object)
-  {
-    ByteBuffer bb = ByteBuffer.wrap(object);
-    return bb.hashCode();
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(URLSerDe.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java
deleted file mode 100644
index 20bb673..0000000
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.twitter;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.common.util.BaseOperator;
-
-/**
- *
- * WindowedTopCounter is an operator which counts the most often occurring tuples in a sliding window of a specific size.
- * The operator expects to receive a map object which contains a set of objects mapped to their respective frequency of
- * occurrences. e.g. if we are looking at most commonly occurring names then the operator expects to receive the tuples
- * of type Map<String, Intenger> on its input port, and at the end of the window it emits 1 object of type Map<String, Integer>
- * with a pre determined size. The emitted object contains the most frequently occurring keys.
- *
- * @param <T> Type of the key in the map object which is accepted on input port as payload. Note that this key must be HashMap friendly.
- * @since 0.3.2
- */
-public class WindowedTopCounter<T> extends BaseOperator
-{
-  public static final String FIELD_TYPE = "type";
-  public static final String FIELD_COUNT = "count";
-
-  private static final Logger logger = LoggerFactory.getLogger(WindowedTopCounter.class);
-
-  private PriorityQueue<SlidingContainer<T>> topCounter;
-  private int windows;
-  private int topCount = 10;
-  private int slidingWindowWidth;
-  private int dagWindowWidth;
-  private HashMap<T, SlidingContainer<T>> objects = new HashMap<T, SlidingContainer<T>>();
-
-  /**
-   * Input port on which map objects containing keys with their respective frequency as values will be accepted.
-   */
-  public final transient DefaultInputPort<Map<T, Integer>> input = new DefaultInputPort<Map<T, Integer>>()
-  {
-    @Override
-    public void process(Map<T, Integer> map)
-    {
-      for (Map.Entry<T, Integer> e : map.entrySet()) {
-        SlidingContainer<T> holder = objects.get(e.getKey());
-        if (holder == null) {
-          holder = new SlidingContainer<T>(e.getKey(), windows);
-          objects.put(e.getKey(), holder);
-        }
-        holder.adjustCount(e.getValue());
-      }
-    }
-  };
-
-  public final transient DefaultOutputPort<List<Map<String, Object>>> output = new DefaultOutputPort<List<Map<String, Object>>>();
-
-  @Override
-  public void setup(OperatorContext context)
-  {
-    windows = (int)(slidingWindowWidth / dagWindowWidth) + 1;
-    if (slidingWindowWidth % dagWindowWidth != 0) {
-      logger.warn("slidingWindowWidth(" + slidingWindowWidth + ") is not exact multiple of dagWindowWidth(" + dagWindowWidth + ")");
-    }
-
-    topCounter = new PriorityQueue<SlidingContainer<T>>(this.topCount, new TopSpotComparator());
-  }
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-    topCounter.clear();
-  }
-
-  @Override
-  public void endWindow()
-  {
-    Iterator<Map.Entry<T, SlidingContainer<T>>> iterator = objects.entrySet().iterator();
-    int i = topCount;
-
-    /*
-     * Try to fill the priority queue with the first topCount URLs.
-     */
-    SlidingContainer<T> holder;
-    while (iterator.hasNext()) {
-      holder = iterator.next().getValue();
-      holder.slide();
-
-      if (holder.totalCount == 0) {
-        iterator.remove();
-      } else {
-        topCounter.add(holder);
-        if (--i == 0) {
-          break;
-        }
-      }
-    }
-    logger.debug("objects.size(): {}", objects.size());
-
-    /*
-     * Make room for the new element in the priority queue by deleting the
-     * smallest one, if we KNOW that the new element is useful to us.
-     */
-    if (i == 0) {
-      int smallest = topCounter.peek().totalCount;
-      while (iterator.hasNext()) {
-        holder = iterator.next().getValue();
-        holder.slide();
-
-        if (holder.totalCount > smallest) {
-          topCounter.poll();
-          topCounter.add(holder);
-          smallest = topCounter.peek().totalCount;
-        } else if (holder.totalCount == 0) {
-          iterator.remove();
-        }
-      }
-    }
-
-    List<Map<String, Object>> data = Lists.newArrayList();
-
-    Iterator<SlidingContainer<T>> topIter = topCounter.iterator();
-
-    while (topIter.hasNext()) {
-      final SlidingContainer<T> wh = topIter.next();
-      Map<String, Object> tableRow = Maps.newHashMap();
-
-      tableRow.put(FIELD_TYPE, wh.identifier.toString());
-      tableRow.put(FIELD_COUNT, wh.totalCount);
-
-      data.add(tableRow);
-    }
-
-    Collections.sort(data, TwitterOutputSorter.INSTANCE);
-
-    output.emit(data);
-    topCounter.clear();
-  }
-
-  @Override
-  public void teardown()
-  {
-    topCounter = null;
-    objects = null;
-  }
-
-  /**
-   * Set the count of most frequently occurring keys to emit per map object.
-   *
-   * @param count count of the objects in the map emitted at the output port.
-   */
-  public void setTopCount(int count)
-  {
-    topCount = count;
-  }
-
-  public int getTopCount()
-  {
-    return topCount;
-  }
-
-  /**
-   * @return the windows
-   */
-  public int getWindows()
-  {
-    return windows;
-  }
-
-  /**
-   * @param windows the windows to set
-   */
-  public void setWindows(int windows)
-  {
-    this.windows = windows;
-  }
-
-  /**
-   * @return the slidingWindowWidth
-   */
-  public int getSlidingWindowWidth()
-  {
-    return slidingWindowWidth;
-  }
-
-  /**
-   * Set the width of the sliding window.
-   *
-   * Sliding window is typically much larger than the dag window. e.g. One may want to measure the most frequently
-   * occurring keys over the period of 5 minutes. So if dagWindowWidth (which is by default 500ms) is set to 500ms,
-   * the slidingWindowWidth would be (60 * 5 * 1000 =) 300000.
-   *
-   * @param slidingWindowWidth - Sliding window width to be set for this operator, recommended to be multiple of DAG window.
-   */
-  public void setSlidingWindowWidth(int slidingWindowWidth)
-  {
-    this.slidingWindowWidth = slidingWindowWidth;
-  }
-
-  /**
-   * @return the dagWindowWidth
-   */
-  public int getDagWindowWidth()
-  {
-    return dagWindowWidth;
-  }
-
-  /**
-   * Set the width of the sliding window.
-   *
-   * Sliding window is typically much larger than the dag window. e.g. One may want to measure the most frequently
-   * occurring keys over the period of 5 minutes. So if dagWindowWidth (which is by default 500ms) is set to 500ms,
-   * the slidingWindowWidth would be (60 * 5 * 1000 =) 300000.
-   *
-   * @param dagWindowWidth - DAG's native window width. It has to be the value of the native window set at the application level.
-   */
-  public void setDagWindowWidth(int dagWindowWidth)
-  {
-    this.dagWindowWidth = dagWindowWidth;
-  }
-
-  static class TopSpotComparator implements Comparator<SlidingContainer<?>>
-  {
-    @Override
-    public int compare(SlidingContainer<?> o1, SlidingContainer<?> o2)
-    {
-      if (o1.totalCount > o2.totalCount) {
-        return 1;
-      } else if (o1.totalCount < o2.totalCount) {
-        return -1;
-      }
-
-      return 0;
-    }
-  }
-
-  private static class TwitterOutputSorter implements Comparator<Map<String, Object>>
-  {
-    public static final TwitterOutputSorter INSTANCE = new TwitterOutputSorter();
-
-    private TwitterOutputSorter()
-    {
-    }
-
-    @Override
-    public int compare(Map<String, Object> o1, Map<String, Object> o2)
-    {
-      Integer count1 = (Integer)o1.get(FIELD_COUNT);
-      Integer count2 = (Integer)o2.get(FIELD_COUNT);
-
-      return count1.compareTo(count2);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/doc-files/Application.gif
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/doc-files/Application.gif b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/doc-files/Application.gif
deleted file mode 100644
index d21e1d9..0000000
Binary files a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/doc-files/Application.gif and /dev/null differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/package-info.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/package-info.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/package-info.java
deleted file mode 100644
index 5a02e4b..0000000
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Twitter top URL's demonstration application.
- */
-package com.datatorrent.demos.twitter;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/resources/META-INF/properties-TwitterKinesisDemo.xml
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/resources/META-INF/properties-TwitterKinesisDemo.xml b/demos/twitter/src/main/resources/META-INF/properties-TwitterKinesisDemo.xml
deleted file mode 100644
index 7d45153..0000000
--- a/demos/twitter/src/main/resources/META-INF/properties-TwitterKinesisDemo.xml
+++ /dev/null
@@ -1,52 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-
-<!-- properties for the twitter kinesis demo -->
-<configuration>
-
-    <property>
-        <name>dt.operator.FromKinesis.streamName</name>
-        <value>TwitterTag</value>
-    </property>
-    <property>
-        <name>dt.operator.FromKinesis.accessKey</name>
-    </property>
-    <property>
-        <name>dt.operator.FromKinesis.secretKey</name>
-    </property>
-    <property>
-        <name>dt.operator.FromKinesis.endPoint</name>
-    </property>
-    <property>
-        <name>dt.operator.ToKinesis.streamName</name>
-        <value>TwitterTag</value>
-    </property>
-    <property>
-        <name>dt.operator.ToKinesis.accessKey</name>
-    </property>
-    <property>
-        <name>dt.operator.ToKinesis.secretKey</name>
-    </property>
-    <property>
-        <name>dt.operator.ToKinesis.endPoint</name>
-    </property>
-
-</configuration>


Mime
View raw message