streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject [1/3] incubator-streams-examples git commit: basic implementation (untested)
Date Tue, 31 Mar 2015 15:48:11 GMT
Repository: incubator-streams-examples
Updated Branches:
  refs/heads/STREAMS-290 [created] 96129f444


basic implementation (untested)


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/28d01953
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/28d01953
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/28d01953

Branch: refs/heads/STREAMS-290
Commit: 28d01953c961c1bb0b94ead989b01db4f98b3a0b
Parents: 076b3ac
Author: sblackmon <sblackmon@apache.org>
Authored: Fri Feb 20 09:44:40 2015 -0600
Committer: sblackmon <sblackmon@apache.org>
Committed: Mon Feb 23 10:39:24 2015 -0600

----------------------------------------------------------------------
 local/pom.xml                                   |   2 +
 .../twitter-userstream-elasticsearch/README.md  |  72 ++++++
 local/twitter-userstream-elasticsearch/pom.xml  | 243 +++++++++++++++++++
 .../example/TwitterUserstreamElasticsearch.java | 160 ++++++++++++
 .../twitter/TwitterUserstreamElasticsearch.json |  14 ++
 .../TwitterUserstreamElasticsearch.dot          |  26 ++
 .../src/main/resources/application.conf         |  22 ++
 .../twitter-userstream-elasticsearch.png        | Bin 0 -> 16906 bytes
 8 files changed, 539 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/28d01953/local/pom.xml
----------------------------------------------------------------------
diff --git a/local/pom.xml b/local/pom.xml
index b75e138..30e0206 100644
--- a/local/pom.xml
+++ b/local/pom.xml
@@ -37,6 +37,8 @@
     </properties>
 
     <modules>
+
+        <module>twitter-userstream-elasticsearch</module>
     </modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/28d01953/local/twitter-userstream-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/README.md b/local/twitter-userstream-elasticsearch/README.md
new file mode 100644
index 0000000..e2b4a2d
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/README.md
@@ -0,0 +1,72 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+twitter-userstream-elasticsearch
+==============================
+
+Requirements:
+-------------
+ - Authorized Twitter API credentials
+ - A running ElasticSearch 1.0.0+ instance
+
+Description:
+------------
+This example connects to an active twitter account and stores the userstream as activities
in Elasticsearch
+
+Specification:
+-----------------
+
+[TwitterUserstreamElasticsearch.dot](src/main/resources/TwitterUserstreamElasticsearch.dot
"TwitterUserstreamElasticsearch.dot" )
+
+Diagram:
+-----------------
+
+![TwitterUserstreamElasticsearch.png](./TwitterUserstreamElasticsearch.png?raw=true)
+
+Example Configuration:
+----------------------
+
+    twitter {
+        endpoint = "userstream"
+        oauth {
+                consumerKey = "bcg14JThZEGoZ3MZOoT2HnJS7"
+                consumerSecret = "S4dwxnZni58CIJaoupGnUrO4HRHmbBGOb28W6IqOJBx36LPw2z"
+                accessToken = ""
+                accessTokenSecret = ""
+        }
+    }
+    elasticsearch {
+        hosts = [
+            localhost
+        ]
+        port = 9300
+        clusterName = elasticsearch
+        index = userstream_activity
+        type = activity
+        batchSize = 1
+    }
+
+The consumerKey and consumerSecret are set for our streams-example application
+The accessToken and accessTokenSecret can be obtained by navigating to:
+ https://api.twitter.com/oauth/authenticate?oauth_token=UIJ0AUxCJatpKDUyFt0OTSEP4asZgqxRwUCT0AMSwc&oauth_callback=http%3A%2F%2Foauth.streamstutorial.w2odata.com%3A8080%2Fsocialauthdemo%2FsocialAuthSuccessAction.do
+
+Build:
+---------
+
+`mvn clean package verify`
+
+Run:
+--------
+
+`java -cp target/twitter-userstream-elasticsearch-0.2-incubating-SNAPSHOT.jar -Dconfig.file=src/main/resources/application.json
org.apache.streams.example.twitter.TwitterUserstreamElasticsearch`
+
+Deploy:
+--------
+`mvn -Pdocker clean package docker:build`
+
+`docker tag twitter-userstream-elasticsearch:0.2-incubating-SNAPSHOT <dockerregistry>:twitter-userstream-elasticsearch:0.2-incubating-SNAPSHOT`
+
+`docker push <dockerregistry>:twitter-userstream-elasticsearch:0.2-incubating-SNAPSHOT`
+
+`docker run <dockerregistry>:twitter-userstream-elasticsearch:0.2-incubating-SNAPSHOT.jar
java -cp twitter-userstream-elasticsearch-0.2-incubating-SNAPSHOT.jar -Dconfig.file=http://<location_of_config_file>.json
org.apache.streams.example.twitter.TwitterUserstreamElasticsearch`

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/28d01953/local/twitter-userstream-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/pom.xml b/local/twitter-userstream-elasticsearch/pom.xml
new file mode 100644
index 0000000..1a3074c
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/pom.xml
@@ -0,0 +1,243 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-examples-local</artifactId>
+        <version>0.2-incubating-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>twitter-userstream-elasticsearch</artifactId>
+
+    <properties>
+        <elasticsearch.version>1.1.0</elasticsearch.version>
+        <lucene.version>4.7.2</lucene.version>
+    </properties>
+
+    <dependencies>
+        <!-- Test includes -->
+        <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-test-framework</artifactId>
+            <version>${lucene.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-codecs</artifactId>
+            <version>${lucene.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>${elasticsearch.version}</version>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-runtime-local</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-provider-twitter</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-persist-elasticsearch</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <artifactId>maven-clean-plugin</artifactId>
+                <configuration>
+                    <filesets>
+                        <fileset>
+                            <directory>data</directory>
+                            <followSymlinks>false</followSymlinks>
+                        </fileset>
+                    </filesets>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <finalName>${project.build.finalName}</finalName>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
/>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass>org.apache.streams.example.twitter.TwitterUserstreamElasticsearch</mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jsonschema2pojo</groupId>
+                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                <version>0.4.1</version>
+                <configuration>
+                    <addCompileSourceRoot>true</addCompileSourceRoot>
+                    <generateBuilders>true</generateBuilders>
+                    <sourcePaths>
+                        <sourcePath>src/main/jsonschema</sourcePath>
+                    </sourcePaths>
+                    <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                    <targetPackage>org.apache.streams.example.elasticsearch</targetPackage>
+                    <useJodaDates>false</useJodaDates>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jsonschema2pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>2.4</version>
+                <executions>
+                    <execution>
+                        <id>resource-dependencies</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>unpack-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            <includeArtifactIds>streams-pojo</includeArtifactIds>
+                            <includes>**/*.json</includes>
+                            <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <version>2.12.4</version>
+                <executions>
+                    <execution>
+                        <id>integration-tests</id>
+                        <goals>
+                            <goal>integration-test</goal>
+                            <goal>verify</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/28d01953/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
new file mode 100644
index 0000000..f24b1c4
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.example;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchPersistDeleter;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.example.twitter.TwitterUserstreamElasticsearchConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.pojo.json.Delete;
+import org.apache.streams.pojo.json.Follow;
+import org.apache.streams.pojo.json.Page;
+import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.pojo.FriendList;
+import org.apache.streams.twitter.pojo.UserstreamEvent;
+import org.apache.streams.twitter.processor.TwitterTypeConverter;
+import org.apache.streams.twitter.provider.TwitterConfigurator;
+import org.apache.streams.twitter.provider.TwitterStreamProvider;
+import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
+import org.apache.streams.twitter.serializer.util.TwitterActivityUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Example stream that populates elasticsearch with activities from twitter userstream in
real-time
+ */
+public class TwitterUserstreamElasticsearch implements Runnable {
+
+    public final static String STREAMS_ID = "TwitterUserstreamElasticsearch";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearch.class);
+
+    TwitterUserstreamElasticsearchConfiguration config;
+
+    public TwitterUserstreamElasticsearch() {
+        this(new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+
+    }
+
+    public TwitterUserstreamElasticsearch(TwitterUserstreamElasticsearchConfiguration config)
{
+        this.config = config;
+    }
+
+    public static void main(String[] args)
+    {
+        LOGGER.info(StreamsConfigurator.config.toString());
+
+        TwitterUserstreamElasticsearch userstream = new TwitterUserstreamElasticsearch();
+        new Thread(userstream).start();
+
+    }
+
+    @Override
+    public void run() {
+
+        TwitterStreamConfiguration twitterStreamConfiguration = config.getTwitter();
+        ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = config.getElasticsearch();
+
+        TwitterStreamProvider stream = new TwitterStreamProvider(twitterStreamConfiguration);
+        TwitterTypeConverter converter = new TwitterTypeConverter(ObjectNode.class, Activity.class);
+        ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(elasticsearchWriterConfiguration);
+        DeleteOnlyProcessor deleteOnlyProcessor = new DeleteOnlyProcessor();
+        NoDeletesProcessor noDeletesProcessor = new NoDeletesProcessor();
+        ElasticsearchPersistDeleter deleter = new ElasticsearchPersistDeleter(elasticsearchWriterConfiguration);
+
+        Map<String, Object> streamConfig = Maps.newHashMap();
+        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 12 * 60 * 1000);
+        StreamBuilder builder = new LocalStreamBuilder(25);
+
+        builder.newPerpetualStream(TwitterStreamProvider.STREAMS_ID, stream);
+        builder.addStreamsProcessor("converter", converter, 2, TwitterStreamProvider.STREAMS_ID);
+        builder.addStreamsProcessor("NoDeletesProcessor", noDeletesProcessor, 1, "converter");
+        builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, writer, 1,
"NoDeletesProcessor");
+        builder.addStreamsProcessor("DeleteOnlyProcessor", deleteOnlyProcessor, 1, "converter");
+        builder.addStreamsPersistWriter(ElasticsearchPersistDeleter.STREAMS_ID, deleter,
1, "DeleteOnlyProcessor");
+
+        builder.start();
+
+    }
+
+    private class DeleteOnlyProcessor implements StreamsProcessor
+    {
+        String delete = new Delete().getVerb();
+
+        @Override
+        public void prepare(Object configurationObject) {}
+
+        @Override
+        public void cleanUp() {}
+
+        @Override
+        public List<StreamsDatum> process(StreamsDatum entry) {
+            Preconditions.checkArgument(entry.getDocument() instanceof Activity);
+            Activity activity = (Activity) entry.getDocument();
+            if( activity.getVerb().equals(delete))
+                return Lists.newArrayList(entry);
+            else
+                return Lists.newArrayList();
+        }
+    }
+
+    private class NoDeletesProcessor implements StreamsProcessor
+    {
+        String delete = new Delete().getVerb();
+
+        @Override
+        public void prepare(Object configurationObject) {}
+
+        @Override
+        public void cleanUp() {}
+
+        @Override
+        public List<StreamsDatum> process(StreamsDatum entry) {
+            Preconditions.checkArgument(entry.getDocument() instanceof Activity);
+            Activity activity = (Activity) entry.getDocument();
+            if( activity.getVerb().equals(delete))
+                return Lists.newArrayList();
+            else
+                return Lists.newArrayList(entry);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/28d01953/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearch.json
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearch.json
b/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearch.json
new file mode 100644
index 0000000..d2167a8
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/src/main/jsonschema/org/apache/streams/example/twitter/TwitterUserstreamElasticsearch.json
@@ -0,0 +1,14 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.openwebfoundation.org/legal/the-owf-1-0-agreements/owfa-1-0",
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "type": "object",
+  "javaType" : "org.apache.streams.example.twitter.TwitterUserstreamElasticsearchConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "twitter": { "javaType": "org.apache.streams.twitter.TwitterStreamConfiguration", "type":
"object", "required": true },
+    "elasticsearch": { "javaType": "org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration",
"type": "object", "required": true }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/28d01953/local/twitter-userstream-elasticsearch/src/main/resources/TwitterUserstreamElasticsearch.dot
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/src/main/resources/TwitterUserstreamElasticsearch.dot
b/local/twitter-userstream-elasticsearch/src/main/resources/TwitterUserstreamElasticsearch.dot
new file mode 100644
index 0000000..5302bf9
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/src/main/resources/TwitterUserstreamElasticsearch.dot
@@ -0,0 +1,26 @@
+digraph g {
+
+  //providers
+  TwitterStreamProvider [label="TwitterStreamProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java"];
+
+  //processors
+  TwitterTypeConverter [label="TwitterTypeConverter",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java"];
+  DeleteOnlyProcessor  [label="DeleteOnlyProcessor ",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java"];
+  NoDeletesProcessor  [label="NoDeletes",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java"];
+   
+  //persisters
+  ElasticsearchPersistWriter [label="ElasticsearchPersistWriter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java"];
+  ElasticsearchPersistDeleter [label="ElasticsearchPersistDeleter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java"];
+
+   //data
+  es [label="es://{index}/{type}",shape=box];
+
+  //stream
+  TwitterStreamProvider -> TwitterTypeConverter [label="ObjectNode"];
+  TwitterTypeConverter -> DeleteOnlyProcessor [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
+  TwitterTypeConverter -> NoDeletesProcessor [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
+  DeleteOnlyProcessor -> ElasticsearchPersistDeleter [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
+  NoDeletesProcessor -> ElasticsearchPersistWriter [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
+  ElasticsearchPersistWriter -> es [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
+  ElasticsearchPersistDeleter -> es [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/28d01953/local/twitter-userstream-elasticsearch/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/src/main/resources/application.conf b/local/twitter-userstream-elasticsearch/src/main/resources/application.conf
new file mode 100644
index 0000000..50d48b5
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/src/main/resources/application.conf
@@ -0,0 +1,22 @@
+twitter {
+    endpoint = "userstream"
+    oauth {
+        consumerKey = ""
+        consumerSecret = ""
+        accessToken = ""
+        accessTokenSecret = ""
+    }
+    follow = [
+           
+    ]
+}
+elasticsearch {
+    hosts = [
+        localhost
+    ]
+    port = 9300
+    clusterName = elasticsearch
+    index = userstream_activity
+    type = activity
+    batchSize = 1
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/28d01953/local/twitter-userstream-elasticsearch/twitter-userstream-elasticsearch.png
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/twitter-userstream-elasticsearch.png b/local/twitter-userstream-elasticsearch/twitter-userstream-elasticsearch.png
new file mode 100644
index 0000000..de2b99f
Binary files /dev/null and b/local/twitter-userstream-elasticsearch/twitter-userstream-elasticsearch.png
differ


Mime
View raw message