streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject [7/9] incubator-streams-examples git commit: STREAMS-474: Apply check style requirements to examples
Date Sat, 17 Dec 2016 19:59:00 GMT
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/MongoElasticsearchSync.java
----------------------------------------------------------------------
diff --git a/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/MongoElasticsearchSync.java b/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/MongoElasticsearchSync.java
index 4527a6b..2d994b9 100644
--- a/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/MongoElasticsearchSync.java
+++ b/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/MongoElasticsearchSync.java
@@ -22,61 +22,56 @@ import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.local.LocalRuntimeConfiguration;
 import org.apache.streams.local.builders.LocalStreamBuilder;
 import org.apache.streams.mongo.MongoPersistReader;
 
-import com.google.common.collect.Maps;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 /**
- * Copies documents into a new index
+ * Copies a mongo collection to an elasticsearch index.
  */
 public class MongoElasticsearchSync implements Runnable {
 
-    public final static String STREAMS_ID = "MongoElasticsearchSync";
+  public final static String STREAMS_ID = "MongoElasticsearchSync";
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSync.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSync.class);
 
-    MongoElasticsearchSyncConfiguration config;
+  MongoElasticsearchSyncConfiguration config;
 
-    public MongoElasticsearchSync() {
-        this(new ComponentConfigurator<MongoElasticsearchSyncConfiguration>(MongoElasticsearchSyncConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
-    }
+  public MongoElasticsearchSync() {
+    this(new ComponentConfigurator<MongoElasticsearchSyncConfiguration>(MongoElasticsearchSyncConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+  }
 
-    public MongoElasticsearchSync(MongoElasticsearchSyncConfiguration config) {
-        this.config = config;
-    }
+  public MongoElasticsearchSync(MongoElasticsearchSyncConfiguration config) {
+    this.config = config;
+  }
 
-    public static void main(String[] args)
-    {
-        LOGGER.info(StreamsConfigurator.config.toString());
+  public static void main(String[] args)
+  {
+    LOGGER.info(StreamsConfigurator.config.toString());
 
-        MongoElasticsearchSync sync = new MongoElasticsearchSync();
+    MongoElasticsearchSync sync = new MongoElasticsearchSync();
 
-        new Thread(sync).start();
+    new Thread(sync).start();
 
-    }
+  }
 
-    @Override
-    public void run() {
+  @Override
+  public void run() {
 
-        MongoPersistReader mongoPersistReader = new MongoPersistReader(config.getSource());
+    MongoPersistReader mongoPersistReader = new MongoPersistReader(config.getSource());
 
-        ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
+    ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
 
-        LocalRuntimeConfiguration localRuntimeConfiguration = new LocalRuntimeConfiguration();
-        localRuntimeConfiguration.setIdentifier(STREAMS_ID);
-        localRuntimeConfiguration.setTaskTimeoutMs((long)(60 * 1000));
-        localRuntimeConfiguration.setQueueSize((long)1000);
-        StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
+    LocalRuntimeConfiguration localRuntimeConfiguration =
+        StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+    StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
 
-        builder.newPerpetualStream(MongoPersistReader.class.getCanonicalName(), mongoPersistReader);
-        builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, MongoPersistReader.class.getCanonicalName());
-        builder.start();
-    }
+    builder.newPerpetualStream(MongoPersistReader.class.getCanonicalName(), mongoPersistReader);
+    builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, MongoPersistReader.class.getCanonicalName());
+    builder.start();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/test/MongoElasticsearchSyncIT.java
----------------------------------------------------------------------
diff --git a/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/test/MongoElasticsearchSyncIT.java b/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/test/MongoElasticsearchSyncIT.java
index 02af293..84d0fba 100644
--- a/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/test/MongoElasticsearchSyncIT.java
+++ b/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/test/MongoElasticsearchSyncIT.java
@@ -28,7 +28,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -51,55 +50,55 @@ import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 
 /**
- * Test copying documents between two indexes on same cluster
+ * MongoElasticsearchSyncIT is an integration test for MongoElasticsearchSync.
  */
 public class MongoElasticsearchSyncIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSyncIT.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSyncIT.class);
 
-    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+  ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 
-    protected MongoElasticsearchSyncConfiguration testConfiguration;
-    protected Client testClient;
+  protected MongoElasticsearchSyncConfiguration testConfiguration;
+  protected Client testClient;
 
-    @BeforeClass
-    public void prepareTest() throws Exception {
+  @BeforeClass
+  public void prepareTest() throws Exception {
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/MongoElasticsearchSyncIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new ComponentConfigurator<>(MongoElasticsearchSyncConfiguration.class).detectConfiguration(typesafe);
-        testClient = ElasticsearchClientManager.getInstance(testConfiguration.getDestination()).client();
+    Config reference  = ConfigFactory.load();
+    File conf_file = new File("target/test-classes/MongoElasticsearchSyncIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(MongoElasticsearchSyncConfiguration.class).detectConfiguration(typesafe);
+    testClient = ElasticsearchClientManager.getInstance(testConfiguration.getDestination()).client();
 
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+    ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+    ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+    assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
 
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertFalse(indicesExistsResponse.isExists());
-    }
+    IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
+    IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    assertFalse(indicesExistsResponse.isExists());
+  }
 
-    @Test
-    public void testSync() throws Exception {
+  @Test
+  public void testSync() throws Exception {
 
-        MongoElasticsearchSync sync = new MongoElasticsearchSync(testConfiguration);
+    MongoElasticsearchSync sync = new MongoElasticsearchSync(testConfiguration);
 
-        sync.run();
+    sync.run();
 
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertTrue(indicesExistsResponse.isExists());
+    IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
+    IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    assertTrue(indicesExistsResponse.isExists());
 
-        // assert lines in file
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getDestination().getIndex())
-                .setTypes(testConfiguration.getDestination().getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    // assert lines in file
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getDestination().getIndex())
+        .setTypes(testConfiguration.getDestination().getType());
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        assertEquals(89, (int)countResponse.getHits().getTotalHits());
+    assertEquals(89, (int)countResponse.getHits().getTotalHits());
 
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/mongo-elasticsearch-sync/src/test/resources/MongoElasticsearchSyncIT.conf
----------------------------------------------------------------------
diff --git a/local/mongo-elasticsearch-sync/src/test/resources/MongoElasticsearchSyncIT.conf b/local/mongo-elasticsearch-sync/src/test/resources/MongoElasticsearchSyncIT.conf
index 61e61d7..86a41b6 100644
--- a/local/mongo-elasticsearch-sync/src/test/resources/MongoElasticsearchSyncIT.conf
+++ b/local/mongo-elasticsearch-sync/src/test/resources/MongoElasticsearchSyncIT.conf
@@ -30,4 +30,5 @@
     "type": "activity",
     "forceUseConfig": true
   }
+  taskTimeoutMs = 60000
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
----------------------------------------------------------------------
diff --git a/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java b/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
index 34ac8c4..5ffb6ed 100644
--- a/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
+++ b/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java
@@ -18,7 +18,6 @@
 
 package org.apache.streams.example;
 
-import com.google.common.collect.Lists;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.converter.ActivityConverterProcessor;
@@ -27,14 +26,17 @@ import org.apache.streams.converter.TypeConverterProcessor;
 import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.data.ActivityConverter;
 import org.apache.streams.data.DocumentClassifier;
-import org.apache.streams.example.TwitterFollowNeo4jConfiguration;
 import org.apache.streams.graph.GraphHttpConfiguration;
 import org.apache.streams.graph.GraphHttpPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
 import org.apache.streams.local.builders.LocalStreamBuilder;
 import org.apache.streams.twitter.TwitterFollowingConfiguration;
+import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
 import org.apache.streams.twitter.converter.TwitterFollowActivityConverter;
 import org.apache.streams.twitter.provider.TwitterFollowingProvider;
-import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
+
+import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,50 +46,53 @@ import org.slf4j.LoggerFactory;
  */
 public class TwitterFollowNeo4j implements Runnable {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4j.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4j.class);
+
+  TwitterFollowNeo4jConfiguration config;
 
-    TwitterFollowNeo4jConfiguration config;
+  public TwitterFollowNeo4j() {
+    this(new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+  }
 
-    public TwitterFollowNeo4j() {
-        this(new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
-    }
+  public TwitterFollowNeo4j(TwitterFollowNeo4jConfiguration config) {
+    this.config = config;
+  }
 
-    public TwitterFollowNeo4j(TwitterFollowNeo4jConfiguration config) {
-        this.config = config;
-    }
+  public void run() {
 
-    public void run() {
+    TwitterFollowingConfiguration twitterFollowingConfiguration = config.getTwitter();
+    TwitterFollowingProvider followingProvider = new TwitterFollowingProvider(twitterFollowingConfiguration);
+    TypeConverterProcessor converter = new TypeConverterProcessor(String.class);
 
-        TwitterFollowingConfiguration twitterFollowingConfiguration = config.getTwitter();
-        TwitterFollowingProvider followingProvider = new TwitterFollowingProvider(twitterFollowingConfiguration);
-        TypeConverterProcessor converter = new TypeConverterProcessor(String.class);
+    ActivityConverterProcessorConfiguration activityConverterProcessorConfiguration =
+        new ActivityConverterProcessorConfiguration()
+            .withClassifiers(Lists.newArrayList((DocumentClassifier) new TwitterDocumentClassifier()))
+            .withConverters(Lists.newArrayList((ActivityConverter) new TwitterFollowActivityConverter()));
+    ActivityConverterProcessor activity = new ActivityConverterProcessor(activityConverterProcessorConfiguration);
 
-        ActivityConverterProcessorConfiguration activityConverterProcessorConfiguration =
-                new ActivityConverterProcessorConfiguration()
-                        .withClassifiers(Lists.newArrayList((DocumentClassifier) new TwitterDocumentClassifier()))
-                        .withConverters(Lists.newArrayList((ActivityConverter) new TwitterFollowActivityConverter()));
-        ActivityConverterProcessor activity = new ActivityConverterProcessor(activityConverterProcessorConfiguration);
+    GraphHttpConfiguration graphWriterConfiguration = config.getGraph();
+    GraphHttpPersistWriter graphPersistWriter = new GraphHttpPersistWriter(graphWriterConfiguration);
 
-        GraphHttpConfiguration graphWriterConfiguration = config.getGraph();
-        GraphHttpPersistWriter graphPersistWriter = new GraphHttpPersistWriter(graphWriterConfiguration);
+    LocalRuntimeConfiguration localRuntimeConfiguration =
+        StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+    StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
 
-        StreamBuilder builder = new LocalStreamBuilder();
-        builder.newPerpetualStream(TwitterFollowingProvider.STREAMS_ID, followingProvider);
-        builder.addStreamsProcessor("converter", converter, 1, TwitterFollowingProvider.STREAMS_ID);
-        builder.addStreamsProcessor("activity", activity, 1, "converter");
-        builder.addStreamsPersistWriter("graph", graphPersistWriter, 1, "activity");
+    builder.newPerpetualStream(TwitterFollowingProvider.class.getCanonicalName(), followingProvider);
+    builder.addStreamsProcessor(TypeConverterProcessor.class.getCanonicalName(), converter, 1, TwitterFollowingProvider.class.getCanonicalName());
+    builder.addStreamsProcessor(ActivityConverterProcessor.class.getCanonicalName(), activity, 1, TypeConverterProcessor.class.getCanonicalName());
+    builder.addStreamsPersistWriter(GraphHttpPersistWriter.class.getCanonicalName(), graphPersistWriter, 1, ActivityConverterProcessor.class.getCanonicalName());
 
-        builder.start();
-    }
+    builder.start();
+  }
 
-    public static void main(String[] args) {
+  public static void main(String[] args) {
 
-        LOGGER.info(StreamsConfigurator.config.toString());
+    LOGGER.info(StreamsConfigurator.config.toString());
 
-        TwitterFollowNeo4j stream = new TwitterFollowNeo4j();
+    TwitterFollowNeo4j stream = new TwitterFollowNeo4j();
 
-        stream.run();
+    stream.run();
 
-    }
+  }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
----------------------------------------------------------------------
diff --git a/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java b/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
index 51593b0..ac9362e 100644
--- a/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
+++ b/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java
@@ -18,15 +18,13 @@
 
 package org.apache.streams.example.test;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
 import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.example.TwitterFollowNeo4j;
 import org.apache.streams.example.TwitterFollowNeo4jConfiguration;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.BeforeClass;
@@ -35,35 +33,35 @@ import org.testng.annotations.Test;
 import java.io.File;
 
 /**
- * Example stream that populates elasticsearch with activities from twitter userstream in real-time
+ * TwitterFollowNeo4jIT is an integration test for TwitterFollowNeo4j.
  */
 public class TwitterFollowNeo4jIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4jIT.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4jIT.class);
 
-    protected TwitterFollowNeo4jConfiguration testConfiguration;
+  protected TwitterFollowNeo4jConfiguration testConfiguration;
 
-    private int count = 0;
+  private int count = 0;
 
-    @BeforeClass
-    public void prepareTest() throws Exception {
+  @BeforeClass
+  public void prepareTest() throws Exception {
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/TwitterFollowNeo4jIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(typesafe);
+    Config reference  = ConfigFactory.load();
+    File conf_file = new File("target/test-classes/TwitterFollowNeo4jIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(typesafe);
 
-    }
+  }
 
-    @Test
-    public void testTwitterFollowGraph() throws Exception {
+  @Test
+  public void testTwitterFollowGraph() throws Exception {
 
-        TwitterFollowNeo4j stream = new TwitterFollowNeo4j(testConfiguration);
+    TwitterFollowNeo4j stream = new TwitterFollowNeo4j(testConfiguration);
 
-        stream.run();
+    stream.run();
 
-    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf
----------------------------------------------------------------------
diff --git a/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf b/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf
index d4b4aeb..346b111 100644
--- a/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf
+++ b/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf
@@ -25,4 +25,5 @@ graph {
   port = ${neo4j.http.port}
   type = "neo4j"
   graph = "data"
-}
\ No newline at end of file
+}
+taskTimeoutMs = 60000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java
----------------------------------------------------------------------
diff --git a/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java b/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java
index 7d87f36..60f3405 100644
--- a/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java
+++ b/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java
@@ -18,14 +18,17 @@
 
 package org.apache.streams.example;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.converter.ActivityConverterProcessor;
 import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
 import org.apache.streams.local.builders.LocalStreamBuilder;
 import org.apache.streams.twitter.provider.TwitterTimelineProvider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,48 +37,48 @@ import org.slf4j.LoggerFactory;
  *
  * Converts them to activities, and writes them in activity format to Elasticsearch.
  */
-
 public class TwitterHistoryElasticsearch implements Runnable {
 
-    public final static String STREAMS_ID = "TwitterHistoryElasticsearch";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearch.class);
+  public final static String STREAMS_ID = "TwitterHistoryElasticsearch";
 
-    private static final ObjectMapper mapper = new ObjectMapper();
+  private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearch.class);
 
-    TwitterHistoryElasticsearchConfiguration config;
+  private static final ObjectMapper mapper = new ObjectMapper();
 
-    public TwitterHistoryElasticsearch() {
-        this(new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+  TwitterHistoryElasticsearchConfiguration config;
 
-    }
+  public TwitterHistoryElasticsearch() {
+    this(new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+  }
 
-    public TwitterHistoryElasticsearch(TwitterHistoryElasticsearchConfiguration config) {
-        this.config = config;
-    }
+  public TwitterHistoryElasticsearch(TwitterHistoryElasticsearchConfiguration config) {
+    this.config = config;
+  }
 
-    public static void main(String[] args)
-    {
-        LOGGER.info(StreamsConfigurator.config.toString());
+  public static void main(String[] args)
+  {
+    LOGGER.info(StreamsConfigurator.config.toString());
 
-        TwitterHistoryElasticsearch history = new TwitterHistoryElasticsearch();
+    TwitterHistoryElasticsearch history = new TwitterHistoryElasticsearch();
 
-        new Thread(history).start();
+    new Thread(history).start();
 
-    }
+  }
 
 
-    public void run() {
+  public void run() {
 
-        TwitterTimelineProvider provider = new TwitterTimelineProvider(config.getTwitter());
-        ActivityConverterProcessor converter = new ActivityConverterProcessor();
-        ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(config.getElasticsearch());
+    TwitterTimelineProvider provider = new TwitterTimelineProvider(config.getTwitter());
+    ActivityConverterProcessor converter = new ActivityConverterProcessor();
+    ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(config.getElasticsearch());
 
-        StreamBuilder builder = new LocalStreamBuilder(500);
+    LocalRuntimeConfiguration localRuntimeConfiguration =
+        StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+    StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
 
-        builder.newPerpetualStream("provider", provider);
-        builder.addStreamsProcessor("converter", converter, 2, "provider");
-        builder.addStreamsPersistWriter("writer", writer, 1, "converter");
-        builder.start();
-    }
+    builder.newPerpetualStream(TwitterTimelineProvider.class.getCanonicalName(), provider);
+    builder.addStreamsProcessor(ActivityConverterProcessor.class.getCanonicalName(), converter, 2, TwitterTimelineProvider.class.getCanonicalName());
+    builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), writer, 1, ActivityConverterProcessor.class.getCanonicalName());
+    builder.start();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java
----------------------------------------------------------------------
diff --git a/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java b/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java
index 07c1d88..0eb022b 100644
--- a/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java
+++ b/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java
@@ -26,7 +26,6 @@ import org.apache.streams.example.TwitterHistoryElasticsearchConfiguration;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -45,62 +44,61 @@ import org.testng.annotations.Test;
 
 import java.io.File;
 
-import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.AssertJUnit.assertTrue;
 
 /**
- * Example stream that populates elasticsearch with activities from twitter userstream in real-time
+ * Example stream that populates elasticsearch with activities from twitter userstream in real-time.
  */
 public class TwitterHistoryElasticsearchIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearchIT.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearchIT.class);
 
-    protected TwitterHistoryElasticsearchConfiguration testConfiguration;
-    protected Client testClient;
+  protected TwitterHistoryElasticsearchConfiguration testConfiguration;
+  protected Client testClient;
 
-    private int count = 0;
+  private int count = 0;
 
-    @BeforeClass
-    public void prepareTest() throws Exception {
+  @BeforeClass
+  public void prepareTest() throws Exception {
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/TwitterHistoryElasticsearchIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(typesafe);
-        testClient = ElasticsearchClientManager.getInstance(testConfiguration.getElasticsearch()).client();
+    Config reference  = ConfigFactory.load();
+    File conf_file = new File("target/test-classes/TwitterHistoryElasticsearchIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(typesafe);
+    testClient = ElasticsearchClientManager.getInstance(testConfiguration.getElasticsearch()).client();
 
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+    ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+    ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+    assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
 
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        if(indicesExistsResponse.isExists()) {
-            DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getElasticsearch().getIndex());
-            DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
-            assertTrue(deleteIndexResponse.isAcknowledged());
-        };
-    }
+    IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
+    IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    if(indicesExistsResponse.isExists()) {
+      DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getElasticsearch().getIndex());
+      DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
+      assertTrue(deleteIndexResponse.isAcknowledged());
+    };
+  }
 
-    @Test
-    public void testTwitterHistoryElasticsearch() throws Exception {
+  @Test
+  public void testTwitterHistoryElasticsearch() throws Exception {
 
-        TwitterHistoryElasticsearch stream = new TwitterHistoryElasticsearch(testConfiguration);
+    TwitterHistoryElasticsearch stream = new TwitterHistoryElasticsearch(testConfiguration);
 
-        stream.run();
+    stream.run();
 
-        // assert lines in file
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getElasticsearch().getIndex())
-                .setTypes(testConfiguration.getElasticsearch().getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    // assert lines in file
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getElasticsearch().getIndex())
+        .setTypes(testConfiguration.getElasticsearch().getType());
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        count = (int)countResponse.getHits().getTotalHits();
+    count = (int)countResponse.getHits().getTotalHits();
 
-        assertNotEquals(count, 0);
-    }
+    assertNotEquals(count, 0);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/twitter-history-elasticsearch/src/test/resources/TwitterHistoryElasticsearchIT.conf
----------------------------------------------------------------------
diff --git a/local/twitter-history-elasticsearch/src/test/resources/TwitterHistoryElasticsearchIT.conf b/local/twitter-history-elasticsearch/src/test/resources/TwitterHistoryElasticsearchIT.conf
index 1a05e32..81e4903 100644
--- a/local/twitter-history-elasticsearch/src/test/resources/TwitterHistoryElasticsearchIT.conf
+++ b/local/twitter-history-elasticsearch/src/test/resources/TwitterHistoryElasticsearchIT.conf
@@ -26,4 +26,4 @@ elasticsearch {
   index = twitter_history_elasticsearch_it
   type = activity
   forceUseConfig = true
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java
index f1e776a..369ec0b 100644
--- a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java
+++ b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java
@@ -18,129 +18,129 @@
 
 package org.apache.streams.example;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.converter.ActivityConverterProcessor;
+import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.elasticsearch.ElasticsearchPersistDeleter;
 import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
 import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
-import org.apache.streams.example.TwitterUserstreamElasticsearchConfiguration;
 import org.apache.streams.filters.VerbDefinitionDropFilter;
 import org.apache.streams.filters.VerbDefinitionKeepFilter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
 import org.apache.streams.local.builders.LocalStreamBuilder;
-import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.TwitterStreamConfiguration;
 import org.apache.streams.twitter.provider.TwitterStreamProvider;
 import org.apache.streams.verbs.ObjectCombination;
 import org.apache.streams.verbs.VerbDefinition;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.elasticsearch.common.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
-import java.util.Map;
 
 /**
  * Example stream that populates elasticsearch with activities from twitter userstream in real-time
  */
 public class TwitterUserstreamElasticsearch implements Runnable {
 
-    public final static String STREAMS_ID = "TwitterUserstreamElasticsearch";
+  public final static String STREAMS_ID = "TwitterUserstreamElasticsearch";
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearch.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearch.class);
 
-    /* this pattern will match any/only deletes */
-    private static VerbDefinition deleteVerbDefinition =
-            new VerbDefinition()
-            .withValue("delete")
-            .withObjects(Lists.newArrayList(new ObjectCombination()));
+  /* this pattern will match any/only deletes */
+  private static VerbDefinition deleteVerbDefinition =
+      new VerbDefinition()
+          .withValue("delete")
+          .withObjects(Lists.newArrayList(new ObjectCombination()));
 
-    TwitterUserstreamElasticsearchConfiguration config;
+  TwitterUserstreamElasticsearchConfiguration config;
 
-    public TwitterUserstreamElasticsearch() {
-        this(new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+  public TwitterUserstreamElasticsearch() {
+    this(new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
 
-    }
+  }
 
-    public TwitterUserstreamElasticsearch(TwitterUserstreamElasticsearchConfiguration config) {
-        this.config = config;
-    }
+  public TwitterUserstreamElasticsearch(TwitterUserstreamElasticsearchConfiguration config) {
+    this.config = config;
+  }
 
-    public static void main(String[] args)
-    {
-        LOGGER.info(StreamsConfigurator.config.toString());
+  public static void main(String[] args)
+  {
+    LOGGER.info(StreamsConfigurator.config.toString());
 
-        TwitterUserstreamElasticsearch userstream = new TwitterUserstreamElasticsearch();
-        new Thread(userstream).start();
+    TwitterUserstreamElasticsearch userstream = new TwitterUserstreamElasticsearch();
+    new Thread(userstream).start();
 
-    }
+  }
 
-    @Override
-    public void run() {
+  @Override
+  public void run() {
 
-        TwitterStreamConfiguration twitterStreamConfiguration = config.getTwitter();
-        ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = config.getElasticsearch();
+    TwitterStreamConfiguration twitterStreamConfiguration = config.getTwitter();
+    ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = config.getElasticsearch();
 
-        TwitterStreamProvider stream = new TwitterStreamProvider(twitterStreamConfiguration);
-        ActivityConverterProcessor converter = new ActivityConverterProcessor();
-        VerbDefinitionDropFilter noDeletesProcessor = new VerbDefinitionDropFilter(Sets.newHashSet(deleteVerbDefinition));
-        ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(elasticsearchWriterConfiguration);
-        VerbDefinitionKeepFilter deleteOnlyProcessor = new VerbDefinitionKeepFilter(Sets.newHashSet(deleteVerbDefinition));
-        SetDeleteIdProcessor setDeleteIdProcessor = new SetDeleteIdProcessor();
-        ElasticsearchPersistDeleter deleter = new ElasticsearchPersistDeleter(elasticsearchWriterConfiguration);
+    TwitterStreamProvider stream = new TwitterStreamProvider(twitterStreamConfiguration);
+    ActivityConverterProcessor converter = new ActivityConverterProcessor();
+    VerbDefinitionDropFilter noDeletesProcessor = new VerbDefinitionDropFilter(Sets.newHashSet(deleteVerbDefinition));
+    ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(elasticsearchWriterConfiguration);
+    VerbDefinitionKeepFilter deleteOnlyProcessor = new VerbDefinitionKeepFilter(Sets.newHashSet(deleteVerbDefinition));
+    SetDeleteIdProcessor setDeleteIdProcessor = new SetDeleteIdProcessor();
+    ElasticsearchPersistDeleter deleter = new ElasticsearchPersistDeleter(elasticsearchWriterConfiguration);
 
-        Map<String, Object> streamConfig = Maps.newHashMap();
-        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 12 * 60 * 1000);
-        StreamBuilder builder = new LocalStreamBuilder(25, streamConfig);
+    LocalRuntimeConfiguration localRuntimeConfiguration =
+        StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+    StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
 
-        builder.newPerpetualStream(TwitterStreamProvider.STREAMS_ID, stream);
-        builder.addStreamsProcessor("converter", converter, 2, TwitterStreamProvider.STREAMS_ID);
-        builder.addStreamsProcessor("NoDeletesProcessor", noDeletesProcessor, 1, "converter");
-        builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, writer, 1, "NoDeletesProcessor");
-        builder.addStreamsProcessor("DeleteOnlyProcessor", deleteOnlyProcessor, 1, "converter");
-        builder.addStreamsProcessor("SetDeleteIdProcessor", setDeleteIdProcessor, 1, "DeleteOnlyProcessor");
-        builder.addStreamsPersistWriter("deleter", deleter, 1, "SetDeleteIdProcessor");
+    builder.newPerpetualStream(TwitterStreamProvider.class.getCanonicalName(), stream);
+    builder.addStreamsProcessor(ActivityConverterProcessor.class.getCanonicalName(), converter, 2, TwitterStreamProvider.class.getCanonicalName());
+    builder.addStreamsProcessor(VerbDefinitionDropFilter.class.getCanonicalName(), noDeletesProcessor, 1, ActivityConverterProcessor.class.getCanonicalName());
+    builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), writer, 1, VerbDefinitionDropFilter.class.getCanonicalName());
+    builder.addStreamsProcessor(VerbDefinitionKeepFilter.class.getCanonicalName(), deleteOnlyProcessor, 1, ActivityConverterProcessor.class.getCanonicalName());
+    builder.addStreamsProcessor(SetDeleteIdProcessor.class.getCanonicalName(), setDeleteIdProcessor, 1, VerbDefinitionKeepFilter.class.getCanonicalName());
+    builder.addStreamsPersistWriter(ElasticsearchPersistDeleter.class.getCanonicalName(), deleter, 1, SetDeleteIdProcessor.class.getCanonicalName());
 
-        builder.start();
+    builder.start();
 
-    }
+  }
 
-    protected class SetDeleteIdProcessor implements StreamsProcessor {
+  protected class SetDeleteIdProcessor implements StreamsProcessor {
 
-        public String getId() {
-            return "TwitterUserstreamElasticsearch.SetDeleteIdProcessor";
-        }
+    public String getId() {
+      return "TwitterUserstreamElasticsearch.SetDeleteIdProcessor";
+    }
 
-        @Override
-        public List<StreamsDatum> process(StreamsDatum entry) {
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
 
-            Preconditions.checkArgument(entry.getDocument() instanceof Activity);
-            String id = entry.getId();
-            // replace delete with post in id
-            // ensure ElasticsearchPersistDeleter will remove original post if present
-            id = Strings.replace(id, "delete", "post");
-            entry.setId(id);
+      Preconditions.checkArgument(entry.getDocument() instanceof Activity);
+      String id = entry.getId();
+      // replace delete with post in id
+      // ensure ElasticsearchPersistDeleter will remove original post if present
+      id = Strings.replace(id, "delete", "post");
+      entry.setId(id);
 
-            return Lists.newArrayList(entry);
-        }
+      return Lists.newArrayList(entry);
+    }
 
-        @Override
-        public void prepare(Object configurationObject) {
+    @Override
+    public void prepare(Object configurationObject) {
 
 
-        }
+    }
 
-        @Override
-        public void cleanUp() {
+    @Override
+    public void cleanUp() {
 
-        }
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java
index 2fd26db..63dd8de 100644
--- a/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java
+++ b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java
@@ -26,7 +26,6 @@ import org.apache.streams.example.TwitterUserstreamElasticsearchConfiguration;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -45,7 +44,6 @@ import org.testng.annotations.Test;
 
 import java.io.File;
 
-import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.AssertJUnit.assertTrue;
 
@@ -54,55 +52,55 @@ import static org.testng.AssertJUnit.assertTrue;
  */
 public class TwitterUserstreamElasticsearchIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearchIT.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearchIT.class);
 
-    protected TwitterUserstreamElasticsearchConfiguration testConfiguration;
-    protected Client testClient;
+  protected TwitterUserstreamElasticsearchConfiguration testConfiguration;
+  protected Client testClient;
 
-    private int count = 0;
+  private int count = 0;
 
-    @BeforeClass
-    public void prepareTest() throws Exception {
+  @BeforeClass
+  public void prepareTest() throws Exception {
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/TwitterUserstreamElasticsearchIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(typesafe);
-        testClient = ElasticsearchClientManager.getInstance(testConfiguration.getElasticsearch()).client();
+    Config reference  = ConfigFactory.load();
+    File conf_file = new File("target/test-classes/TwitterUserstreamElasticsearchIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(typesafe);
+    testClient = ElasticsearchClientManager.getInstance(testConfiguration.getElasticsearch()).client();
 
-        ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+    ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+    ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+    assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
 
-        IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
-        IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        if(indicesExistsResponse.isExists()) {
-            DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getElasticsearch().getIndex());
-            DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
-            assertTrue(deleteIndexResponse.isAcknowledged());
-        };
+    IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex());
+    IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    if(indicesExistsResponse.isExists()) {
+      DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getElasticsearch().getIndex());
+      DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
+      assertTrue(deleteIndexResponse.isAcknowledged());
+    };
 
-    }
+  }
 
-    @Test
-    public void testUserstreamElasticsearch() throws Exception {
+  @Test
+  public void testUserstreamElasticsearch() throws Exception {
 
-        TwitterUserstreamElasticsearch stream = new TwitterUserstreamElasticsearch(testConfiguration);
+    TwitterUserstreamElasticsearch stream = new TwitterUserstreamElasticsearch(testConfiguration);
 
-        Thread thread = new Thread(stream);
-        thread.start();
-        thread.join(30000);
+    Thread thread = new Thread(stream);
+    thread.start();
+    thread.join(30000);
 
-        // assert lines in file
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getElasticsearch().getIndex())
-                .setTypes(testConfiguration.getElasticsearch().getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    // assert lines in file
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getElasticsearch().getIndex())
+        .setTypes(testConfiguration.getElasticsearch().getType());
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        count = (int)countResponse.getHits().getTotalHits();
+    count = (int)countResponse.getHits().getTotalHits();
 
-        assertNotEquals(count, 0);
-    }
+    assertNotEquals(count, 0);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf b/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf
index df9be4d..bca2d51 100644
--- a/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf
+++ b/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf
@@ -26,4 +26,5 @@ elasticsearch {
   index = twitter_userstream_elasticsearch_it
   type = activity
   forceUseConfig = true
-}
\ No newline at end of file
+}
+taskTimeoutMs = 60000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e369e36..384d71a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -263,10 +263,88 @@
     </dependencyManagement>
 
     <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.scalastyle</groupId>
+                <artifactId>scalastyle-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <artifactId>maven-enforcer-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <artifactId>maven-remote-resources-plugin</artifactId>
+            </plugin>
+        </plugins>
         <pluginManagement>
             <plugins>
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-checkstyle-plugin</artifactId>
+                    <version>${checkstyle.plugin.version}</version>
+                    <dependencies>
+                        <dependency>
+                            <groupId>com.puppycrawl.tools</groupId>
+                            <artifactId>checkstyle</artifactId>
+                            <version>7.2</version>
+                        </dependency>
+                    </dependencies>
+                    <executions>
+                        <execution>
+                            <id>validate</id>
+                            <phase>validate</phase>
+                            <configuration>
+                                <configLocation>http://streams.incubator.apache.org/site/${project.version}/streams-master/streams-java-checkstyle.xml</configLocation>
+                                <encoding>UTF-8</encoding>
+                                <consoleOutput>true</consoleOutput>
+                                <includeTestSourceDirectory>true</includeTestSourceDirectory>
+                                <testSourceDirectory>${project.basedir}/src/test/java</testSourceDirectory>
+                                <failsOnError>false</failsOnError>
+                            </configuration>
+                            <goals>
+                                <goal>check</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                </plugin>
+                <plugin>
+                    <groupId>org.scalastyle</groupId>
+                    <artifactId>scalastyle-maven-plugin</artifactId>
+                    <version>${scalastyle.plugin.version}</version>
+                    <executions>
+                        <execution>
+                            <id>validate</id>
+                            <phase>validate</phase>
+                            <configuration>
+                                <verbose>false</verbose>
+                                <failOnViolation>false</failOnViolation>
+                                <includeTestSourceDirectory>true</includeTestSourceDirectory>
+                                <failOnWarning>false</failOnWarning>
+                                <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
+                                <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
+                                <!--<configLocation>https://raw.githubusercontent.com/databricks/sbt-databricks/master/scalastyle-config.xml</configLocation>-->
+                                <outputFile>${project.build.directory}/scalastyle-output.xml</outputFile>
+                                <outputEncoding>UTF-8</outputEncoding>
+                            </configuration>
+                            <goals>
+                                <goal>check</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-shade-plugin</artifactId>
                     <version>${shade.plugin.version}</version>
                     <configuration>


Mime
View raw message