streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject [2/4] incubator-streams-examples git commit: refactored to use streams-filters for routing Deletes to PersistDeleter
Date Tue, 07 Apr 2015 01:57:46 GMT
refactored to use streams-filters for routing Deletes to PersistDeleter


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/83f8471d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/83f8471d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/83f8471d

Branch: refs/heads/master
Commit: 83f8471d6676249fd9c0b1cd1cb20e6baaecc9bc
Parents: 28d0195
Author: Steve Blackmon (@steveblackmon) <sblackmon@apache.org>
Authored: Mon Mar 30 18:20:40 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sblackmon@apache.org>
Committed: Mon Mar 30 18:20:40 2015 -0500

----------------------------------------------------------------------
 local/twitter-userstream-elasticsearch/pom.xml  | 45 +++++++++++
 .../example/TwitterUserstreamElasticsearch.java | 83 +++-----------------
 .../TwitterUserstreamElasticsearch.dot          |  4 +-
 3 files changed, 59 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/83f8471d/local/twitter-userstream-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/pom.xml b/local/twitter-userstream-elasticsearch/pom.xml
index 1a3074c..f438e77 100644
--- a/local/twitter-userstream-elasticsearch/pom.xml
+++ b/local/twitter-userstream-elasticsearch/pom.xml
@@ -81,6 +81,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
+            <artifactId>streams-filters</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
             <artifactId>streams-provider-twitter</artifactId>
             <version>${project.version}</version>
         </dependency>
@@ -240,4 +245,44 @@
             </plugin>
         </plugins>
     </build>
+
+    <profiles>
+        <profile>
+            <id>docker</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <!-- The Docker Maven plugin is used to create docker image with
the fat jar -->
+                        <groupId>org.jolokia</groupId>
+                        <artifactId>docker-maven-plugin</artifactId>
+                        <version>0.11.0</version>
+                        <configuration>
+                            <images>
+
+                                <image>
+                                    <alias>${project.artifactId}</alias>
+                                    <name>${project.artifactId}:${project.version}</name>
+                                    <build>
+                                        <from>dockerfile/java:oracle-java8</from>
+                                        <assembly>
+                                            <basedir>/</basedir>
+                                            <descriptorRef>artifact</descriptorRef>
+                                        </assembly>
+                                        <!-- Default command for the build image -->
+                                    </build>
+
+                                </image>
+
+                            </images>
+                        </configuration>
+
+                    </plugin>
+
+                </plugins>
+            </build>
+            <activation>
+                <activeByDefault/>
+            </activation>
+        </profile>
+    </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/83f8471d/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
index f24b1c4..2e7d2cd 100644
--- a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
+++ b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
@@ -18,45 +18,26 @@
 
 package org.apache.streams.twitter.example;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
+import com.google.common.collect.Sets;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.data.util.ActivityUtil;
-import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
+import org.apache.streams.converter.ActivityConverterProcessor;
 import org.apache.streams.elasticsearch.ElasticsearchPersistDeleter;
 import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
 import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
 import org.apache.streams.example.twitter.TwitterUserstreamElasticsearchConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.filters.VerbDefinitionDropFilter;
+import org.apache.streams.filters.VerbDefinitionKeepFilter;
 import org.apache.streams.local.builders.LocalStreamBuilder;
 import org.apache.streams.core.StreamBuilder;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.apache.streams.pojo.json.Actor;
-import org.apache.streams.pojo.json.Delete;
-import org.apache.streams.pojo.json.Follow;
-import org.apache.streams.pojo.json.Page;
 import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.apache.streams.twitter.pojo.FriendList;
-import org.apache.streams.twitter.pojo.UserstreamEvent;
-import org.apache.streams.twitter.processor.TwitterTypeConverter;
-import org.apache.streams.twitter.provider.TwitterConfigurator;
 import org.apache.streams.twitter.provider.TwitterStreamProvider;
-import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
-import org.apache.streams.twitter.serializer.util.TwitterActivityUtil;
+import org.apache.streams.verbs.VerbDefinition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * Example stream that populates elasticsearch with activities from twitter userstream in
real-time
@@ -67,6 +48,8 @@ public class TwitterUserstreamElasticsearch implements Runnable {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearch.class);
 
+    private static VerbDefinition deleteVerbDefinition = new VerbDefinition().withValue("post");
+
     TwitterUserstreamElasticsearchConfiguration config;
 
     public TwitterUserstreamElasticsearch() {
@@ -94,67 +77,25 @@ public class TwitterUserstreamElasticsearch implements Runnable {
         ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = config.getElasticsearch();
 
         TwitterStreamProvider stream = new TwitterStreamProvider(twitterStreamConfiguration);
-        TwitterTypeConverter converter = new TwitterTypeConverter(ObjectNode.class, Activity.class);
+        ActivityConverterProcessor converter = new ActivityConverterProcessor();
+        VerbDefinitionDropFilter noDeletesProcessor = new VerbDefinitionDropFilter(Sets.newHashSet(deleteVerbDefinition));
         ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(elasticsearchWriterConfiguration);
-        DeleteOnlyProcessor deleteOnlyProcessor = new DeleteOnlyProcessor();
-        NoDeletesProcessor noDeletesProcessor = new NoDeletesProcessor();
+        VerbDefinitionKeepFilter deleteOnlyProcessor = new VerbDefinitionKeepFilter(Sets.newHashSet(deleteVerbDefinition));
         ElasticsearchPersistDeleter deleter = new ElasticsearchPersistDeleter(elasticsearchWriterConfiguration);
 
         Map<String, Object> streamConfig = Maps.newHashMap();
         streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 12 * 60 * 1000);
-        StreamBuilder builder = new LocalStreamBuilder(25);
+        StreamBuilder builder = new LocalStreamBuilder(25, streamConfig);
 
         builder.newPerpetualStream(TwitterStreamProvider.STREAMS_ID, stream);
         builder.addStreamsProcessor("converter", converter, 2, TwitterStreamProvider.STREAMS_ID);
         builder.addStreamsProcessor("NoDeletesProcessor", noDeletesProcessor, 1, "converter");
         builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, writer, 1,
"NoDeletesProcessor");
         builder.addStreamsProcessor("DeleteOnlyProcessor", deleteOnlyProcessor, 1, "converter");
-        builder.addStreamsPersistWriter(ElasticsearchPersistDeleter.STREAMS_ID, deleter,
1, "DeleteOnlyProcessor");
+        builder.addStreamsPersistWriter("deleter", deleter, 1, "DeleteOnlyProcessor");
 
         builder.start();
 
     }
 
-    private class DeleteOnlyProcessor implements StreamsProcessor
-    {
-        String delete = new Delete().getVerb();
-
-        @Override
-        public void prepare(Object configurationObject) {}
-
-        @Override
-        public void cleanUp() {}
-
-        @Override
-        public List<StreamsDatum> process(StreamsDatum entry) {
-            Preconditions.checkArgument(entry.getDocument() instanceof Activity);
-            Activity activity = (Activity) entry.getDocument();
-            if( activity.getVerb().equals(delete))
-                return Lists.newArrayList(entry);
-            else
-                return Lists.newArrayList();
-        }
-    }
-
-    private class NoDeletesProcessor implements StreamsProcessor
-    {
-        String delete = new Delete().getVerb();
-
-        @Override
-        public void prepare(Object configurationObject) {}
-
-        @Override
-        public void cleanUp() {}
-
-        @Override
-        public List<StreamsDatum> process(StreamsDatum entry) {
-            Preconditions.checkArgument(entry.getDocument() instanceof Activity);
-            Activity activity = (Activity) entry.getDocument();
-            if( activity.getVerb().equals(delete))
-                return Lists.newArrayList();
-            else
-                return Lists.newArrayList(entry);
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/83f8471d/local/twitter-userstream-elasticsearch/src/main/resources/TwitterUserstreamElasticsearch.dot
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/src/main/resources/TwitterUserstreamElasticsearch.dot
b/local/twitter-userstream-elasticsearch/src/main/resources/TwitterUserstreamElasticsearch.dot
index 5302bf9..62aa637 100644
--- a/local/twitter-userstream-elasticsearch/src/main/resources/TwitterUserstreamElasticsearch.dot
+++ b/local/twitter-userstream-elasticsearch/src/main/resources/TwitterUserstreamElasticsearch.dot
@@ -5,8 +5,8 @@ digraph g {
 
   //processors
   TwitterTypeConverter [label="TwitterTypeConverter",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java"];
-  DeleteOnlyProcessor  [label="DeleteOnlyProcessor ",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java"];
-  NoDeletesProcessor  [label="NoDeletes",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java"];
+  DeleteOnlyProcessor [label="VerbDefinitionKeepFilter (verb:post)",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionKeepFilter.java"];
+  NoDeletesProcessor  [label="VerbDefinitionDropFilter (verb:post)",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionDropFilter.java"];
    
   //persisters
   ElasticsearchPersistWriter [label="ElasticsearchPersistWriter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java"];


Mime
View raw message