Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 22CD8200BE0 for ; Sat, 17 Dec 2016 20:59:05 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 2153C160B28; Sat, 17 Dec 2016 19:59:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2D352160B0B for ; Sat, 17 Dec 2016 20:59:03 +0100 (CET) Received: (qmail 7163 invoked by uid 500); 17 Dec 2016 19:59:02 -0000 Mailing-List: contact commits-help@streams.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@streams.incubator.apache.org Delivered-To: mailing list commits@streams.incubator.apache.org Received: (qmail 7153 invoked by uid 99); 17 Dec 2016 19:59:02 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 17 Dec 2016 19:59:02 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id AE244C14D6 for ; Sat, 17 Dec 2016 19:59:01 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.218 X-Spam-Level: X-Spam-Status: No, score=-6.218 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id mQG49M5DN4I0 for ; Sat, 17 Dec 2016 19:58:57 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 1E85C5FACC for ; Sat, 17 Dec 2016 19:58:54 +0000 (UTC) Received: (qmail 6618 invoked by uid 99); 17 Dec 2016 19:58:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 17 Dec 2016 19:58:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 317E5DFBA0; Sat, 17 Dec 2016 19:58:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sblackmon@apache.org To: commits@streams.incubator.apache.org Date: Sat, 17 Dec 2016 19:59:00 -0000 Message-Id: <33203090ce034d25b0bb609e1b307f13@git.apache.org> In-Reply-To: <70996b070bbb406985b5a71927a839cd@git.apache.org> References: <70996b070bbb406985b5a71927a839cd@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [7/9] incubator-streams-examples git commit: STREAMS-474: Apply check style requirements to examples archived-at: Sat, 17 Dec 2016 19:59:05 -0000 http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/MongoElasticsearchSync.java ---------------------------------------------------------------------- diff --git a/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/MongoElasticsearchSync.java b/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/MongoElasticsearchSync.java index 4527a6b..2d994b9 100644 --- a/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/MongoElasticsearchSync.java +++ b/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/MongoElasticsearchSync.java @@ -22,61 +22,56 @@ import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamBuilder; import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; +import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.local.LocalRuntimeConfiguration; import org.apache.streams.local.builders.LocalStreamBuilder; import org.apache.streams.mongo.MongoPersistReader; -import com.google.common.collect.Maps; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; - /** - * Copies documents into a new index + * Copies a mongo collection to an elasticsearch index. */ public class MongoElasticsearchSync implements Runnable { - public final static String STREAMS_ID = "MongoElasticsearchSync"; + public final static String STREAMS_ID = "MongoElasticsearchSync"; - private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSync.class); + private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSync.class); - MongoElasticsearchSyncConfiguration config; + MongoElasticsearchSyncConfiguration config; - public MongoElasticsearchSync() { - this(new ComponentConfigurator(MongoElasticsearchSyncConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); - } + public MongoElasticsearchSync() { + this(new ComponentConfigurator(MongoElasticsearchSyncConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); + } - public MongoElasticsearchSync(MongoElasticsearchSyncConfiguration config) { - this.config = config; - } + public MongoElasticsearchSync(MongoElasticsearchSyncConfiguration config) { + this.config = config; + } - public static void main(String[] args) - { - LOGGER.info(StreamsConfigurator.config.toString()); + public static void main(String[] args) + { + LOGGER.info(StreamsConfigurator.config.toString()); - MongoElasticsearchSync sync = new MongoElasticsearchSync(); + MongoElasticsearchSync sync = new MongoElasticsearchSync(); - new Thread(sync).start(); + new Thread(sync).start(); - } + } - @Override - public void run() { + @Override + public void run() { - MongoPersistReader mongoPersistReader = new MongoPersistReader(config.getSource()); + MongoPersistReader mongoPersistReader = new MongoPersistReader(config.getSource()); - ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination()); + ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination()); - LocalRuntimeConfiguration localRuntimeConfiguration = new LocalRuntimeConfiguration(); - localRuntimeConfiguration.setIdentifier(STREAMS_ID); - localRuntimeConfiguration.setTaskTimeoutMs((long)(60 * 1000)); - localRuntimeConfiguration.setQueueSize((long)1000); - StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration); + LocalRuntimeConfiguration localRuntimeConfiguration = + StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class); + StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration); - builder.newPerpetualStream(MongoPersistReader.class.getCanonicalName(), mongoPersistReader); - builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, MongoPersistReader.class.getCanonicalName()); - builder.start(); - } + builder.newPerpetualStream(MongoPersistReader.class.getCanonicalName(), mongoPersistReader); + builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, MongoPersistReader.class.getCanonicalName()); + builder.start(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/test/MongoElasticsearchSyncIT.java ---------------------------------------------------------------------- diff --git a/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/test/MongoElasticsearchSyncIT.java b/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/test/MongoElasticsearchSyncIT.java index 02af293..84d0fba 100644 --- a/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/test/MongoElasticsearchSyncIT.java +++ b/local/mongo-elasticsearch-sync/src/test/java/org/apache/streams/example/test/MongoElasticsearchSyncIT.java @@ -28,7 +28,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; - import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; @@ -51,55 +50,55 @@ import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; /** - * Test copying documents between two indexes on same cluster + * MongoElasticsearchSyncIT is an integration test for MongoElasticsearchSync. */ public class MongoElasticsearchSyncIT { - private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSyncIT.class); + private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSyncIT.class); - ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - protected MongoElasticsearchSyncConfiguration testConfiguration; - protected Client testClient; + protected MongoElasticsearchSyncConfiguration testConfiguration; + protected Client testClient; - @BeforeClass - public void prepareTest() throws Exception { + @BeforeClass + public void prepareTest() throws Exception { - Config reference = ConfigFactory.load(); - File conf_file = new File("target/test-classes/MongoElasticsearchSyncIT.conf"); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - testConfiguration = new ComponentConfigurator<>(MongoElasticsearchSyncConfiguration.class).detectConfiguration(typesafe); - testClient = ElasticsearchClientManager.getInstance(testConfiguration.getDestination()).client(); + Config reference = ConfigFactory.load(); + File conf_file = new File("target/test-classes/MongoElasticsearchSyncIT.conf"); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + testConfiguration = new ComponentConfigurator<>(MongoElasticsearchSyncConfiguration.class).detectConfiguration(typesafe); + testClient = ElasticsearchClientManager.getInstance(testConfiguration.getDestination()).client(); - ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); - ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); - assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); + ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); + ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); + assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); - IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex()); - IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); - assertFalse(indicesExistsResponse.isExists()); - } + IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex()); + IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); + assertFalse(indicesExistsResponse.isExists()); + } - @Test - public void testSync() throws Exception { + @Test + public void testSync() throws Exception { - MongoElasticsearchSync sync = new MongoElasticsearchSync(testConfiguration); + MongoElasticsearchSync sync = new MongoElasticsearchSync(testConfiguration); - sync.run(); + sync.run(); - IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex()); - IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); - assertTrue(indicesExistsResponse.isExists()); + IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex()); + IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); + assertTrue(indicesExistsResponse.isExists()); - // assert lines in file - SearchRequestBuilder countRequest = testClient - .prepareSearch(testConfiguration.getDestination().getIndex()) - .setTypes(testConfiguration.getDestination().getType()); - SearchResponse countResponse = countRequest.execute().actionGet(); + // assert lines in file + SearchRequestBuilder countRequest = testClient + .prepareSearch(testConfiguration.getDestination().getIndex()) + .setTypes(testConfiguration.getDestination().getType()); + SearchResponse countResponse = countRequest.execute().actionGet(); - assertEquals(89, (int)countResponse.getHits().getTotalHits()); + assertEquals(89, (int)countResponse.getHits().getTotalHits()); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/mongo-elasticsearch-sync/src/test/resources/MongoElasticsearchSyncIT.conf ---------------------------------------------------------------------- diff --git a/local/mongo-elasticsearch-sync/src/test/resources/MongoElasticsearchSyncIT.conf b/local/mongo-elasticsearch-sync/src/test/resources/MongoElasticsearchSyncIT.conf index 61e61d7..86a41b6 100644 --- a/local/mongo-elasticsearch-sync/src/test/resources/MongoElasticsearchSyncIT.conf +++ b/local/mongo-elasticsearch-sync/src/test/resources/MongoElasticsearchSyncIT.conf @@ -30,4 +30,5 @@ "type": "activity", "forceUseConfig": true } + taskTimeoutMs = 60000 } http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java ---------------------------------------------------------------------- diff --git a/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java b/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java index 34ac8c4..5ffb6ed 100644 --- a/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java +++ b/local/twitter-follow-neo4j/src/main/java/org/apache/streams/example/TwitterFollowNeo4j.java @@ -18,7 +18,6 @@ package org.apache.streams.example; -import com.google.common.collect.Lists; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.converter.ActivityConverterProcessor; @@ -27,14 +26,17 @@ import org.apache.streams.converter.TypeConverterProcessor; import org.apache.streams.core.StreamBuilder; import org.apache.streams.data.ActivityConverter; import org.apache.streams.data.DocumentClassifier; -import org.apache.streams.example.TwitterFollowNeo4jConfiguration; import org.apache.streams.graph.GraphHttpConfiguration; import org.apache.streams.graph.GraphHttpPersistWriter; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.local.LocalRuntimeConfiguration; import org.apache.streams.local.builders.LocalStreamBuilder; import org.apache.streams.twitter.TwitterFollowingConfiguration; +import org.apache.streams.twitter.converter.TwitterDocumentClassifier; import org.apache.streams.twitter.converter.TwitterFollowActivityConverter; import org.apache.streams.twitter.provider.TwitterFollowingProvider; -import org.apache.streams.twitter.converter.TwitterDocumentClassifier; + +import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,50 +46,53 @@ import org.slf4j.LoggerFactory; */ public class TwitterFollowNeo4j implements Runnable { - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4j.class); + private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4j.class); + + TwitterFollowNeo4jConfiguration config; - TwitterFollowNeo4jConfiguration config; + public TwitterFollowNeo4j() { + this(new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); + } - public TwitterFollowNeo4j() { - this(new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); - } + public TwitterFollowNeo4j(TwitterFollowNeo4jConfiguration config) { + this.config = config; + } - public TwitterFollowNeo4j(TwitterFollowNeo4jConfiguration config) { - this.config = config; - } + public void run() { - public void run() { + TwitterFollowingConfiguration twitterFollowingConfiguration = config.getTwitter(); + TwitterFollowingProvider followingProvider = new TwitterFollowingProvider(twitterFollowingConfiguration); + TypeConverterProcessor converter = new TypeConverterProcessor(String.class); - TwitterFollowingConfiguration twitterFollowingConfiguration = config.getTwitter(); - TwitterFollowingProvider followingProvider = new TwitterFollowingProvider(twitterFollowingConfiguration); - TypeConverterProcessor converter = new TypeConverterProcessor(String.class); + ActivityConverterProcessorConfiguration activityConverterProcessorConfiguration = + new ActivityConverterProcessorConfiguration() + .withClassifiers(Lists.newArrayList((DocumentClassifier) new TwitterDocumentClassifier())) + .withConverters(Lists.newArrayList((ActivityConverter) new TwitterFollowActivityConverter())); + ActivityConverterProcessor activity = new ActivityConverterProcessor(activityConverterProcessorConfiguration); - ActivityConverterProcessorConfiguration activityConverterProcessorConfiguration = - new ActivityConverterProcessorConfiguration() - .withClassifiers(Lists.newArrayList((DocumentClassifier) new TwitterDocumentClassifier())) - .withConverters(Lists.newArrayList((ActivityConverter) new TwitterFollowActivityConverter())); - ActivityConverterProcessor activity = new ActivityConverterProcessor(activityConverterProcessorConfiguration); + GraphHttpConfiguration graphWriterConfiguration = config.getGraph(); + GraphHttpPersistWriter graphPersistWriter = new GraphHttpPersistWriter(graphWriterConfiguration); - GraphHttpConfiguration graphWriterConfiguration = config.getGraph(); - GraphHttpPersistWriter graphPersistWriter = new GraphHttpPersistWriter(graphWriterConfiguration); + LocalRuntimeConfiguration localRuntimeConfiguration = + StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class); + StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration); - StreamBuilder builder = new LocalStreamBuilder(); - builder.newPerpetualStream(TwitterFollowingProvider.STREAMS_ID, followingProvider); - builder.addStreamsProcessor("converter", converter, 1, TwitterFollowingProvider.STREAMS_ID); - builder.addStreamsProcessor("activity", activity, 1, "converter"); - builder.addStreamsPersistWriter("graph", graphPersistWriter, 1, "activity"); + builder.newPerpetualStream(TwitterFollowingProvider.class.getCanonicalName(), followingProvider); + builder.addStreamsProcessor(TypeConverterProcessor.class.getCanonicalName(), converter, 1, TwitterFollowingProvider.class.getCanonicalName()); + builder.addStreamsProcessor(ActivityConverterProcessor.class.getCanonicalName(), activity, 1, TypeConverterProcessor.class.getCanonicalName()); + builder.addStreamsPersistWriter(GraphHttpPersistWriter.class.getCanonicalName(), graphPersistWriter, 1, ActivityConverterProcessor.class.getCanonicalName()); - builder.start(); - } + builder.start(); + } - public static void main(String[] args) { + public static void main(String[] args) { - LOGGER.info(StreamsConfigurator.config.toString()); + LOGGER.info(StreamsConfigurator.config.toString()); - TwitterFollowNeo4j stream = new TwitterFollowNeo4j(); + TwitterFollowNeo4j stream = new TwitterFollowNeo4j(); - stream.run(); + stream.run(); - } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java ---------------------------------------------------------------------- diff --git a/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java b/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java index 51593b0..ac9362e 100644 --- a/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java +++ b/local/twitter-follow-neo4j/src/test/java/org/apache/streams/example/test/TwitterFollowNeo4jIT.java @@ -18,15 +18,13 @@ package org.apache.streams.example.test; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.example.TwitterFollowNeo4j; import org.apache.streams.example.TwitterFollowNeo4jConfiguration; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.BeforeClass; @@ -35,35 +33,35 @@ import org.testng.annotations.Test; import java.io.File; /** - * Example stream that populates elasticsearch with activities from twitter userstream in real-time + * TwitterFollowNeo4jIT is an integration test for TwitterFollowNeo4j. */ public class TwitterFollowNeo4jIT { - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4jIT.class); + private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowNeo4jIT.class); - protected TwitterFollowNeo4jConfiguration testConfiguration; + protected TwitterFollowNeo4jConfiguration testConfiguration; - private int count = 0; + private int count = 0; - @BeforeClass - public void prepareTest() throws Exception { + @BeforeClass + public void prepareTest() throws Exception { - Config reference = ConfigFactory.load(); - File conf_file = new File("target/test-classes/TwitterFollowNeo4jIT.conf"); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - testConfiguration = new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(typesafe); + Config reference = ConfigFactory.load(); + File conf_file = new File("target/test-classes/TwitterFollowNeo4jIT.conf"); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + testConfiguration = new ComponentConfigurator<>(TwitterFollowNeo4jConfiguration.class).detectConfiguration(typesafe); - } + } - @Test - public void testTwitterFollowGraph() throws Exception { + @Test + public void testTwitterFollowGraph() throws Exception { - TwitterFollowNeo4j stream = new TwitterFollowNeo4j(testConfiguration); + TwitterFollowNeo4j stream = new TwitterFollowNeo4j(testConfiguration); - stream.run(); + stream.run(); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf ---------------------------------------------------------------------- diff --git a/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf b/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf index d4b4aeb..346b111 100644 --- a/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf +++ b/local/twitter-follow-neo4j/src/test/resources/TwitterFollowNeo4jIT.conf @@ -25,4 +25,5 @@ graph { port = ${neo4j.http.port} type = "neo4j" graph = "data" -} \ No newline at end of file +} +taskTimeoutMs = 60000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java ---------------------------------------------------------------------- diff --git a/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java b/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java index 7d87f36..60f3405 100644 --- a/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java +++ b/local/twitter-history-elasticsearch/src/main/java/org/apache/streams/example/TwitterHistoryElasticsearch.java @@ -18,14 +18,17 @@ package org.apache.streams.example; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.converter.ActivityConverterProcessor; import org.apache.streams.core.StreamBuilder; import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.local.LocalRuntimeConfiguration; import org.apache.streams.local.builders.LocalStreamBuilder; import org.apache.streams.twitter.provider.TwitterTimelineProvider; + +import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,48 +37,48 @@ import org.slf4j.LoggerFactory; * * Converts them to activities, and writes them in activity format to Elasticsearch. */ - public class TwitterHistoryElasticsearch implements Runnable { - public final static String STREAMS_ID = "TwitterHistoryElasticsearch"; - - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearch.class); + public final static String STREAMS_ID = "TwitterHistoryElasticsearch"; - private static final ObjectMapper mapper = new ObjectMapper(); + private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearch.class); - TwitterHistoryElasticsearchConfiguration config; + private static final ObjectMapper mapper = new ObjectMapper(); - public TwitterHistoryElasticsearch() { - this(new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); + TwitterHistoryElasticsearchConfiguration config; - } + public TwitterHistoryElasticsearch() { + this(new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); + } - public TwitterHistoryElasticsearch(TwitterHistoryElasticsearchConfiguration config) { - this.config = config; - } + public TwitterHistoryElasticsearch(TwitterHistoryElasticsearchConfiguration config) { + this.config = config; + } - public static void main(String[] args) - { - LOGGER.info(StreamsConfigurator.config.toString()); + public static void main(String[] args) + { + LOGGER.info(StreamsConfigurator.config.toString()); - TwitterHistoryElasticsearch history = new TwitterHistoryElasticsearch(); + TwitterHistoryElasticsearch history = new TwitterHistoryElasticsearch(); - new Thread(history).start(); + new Thread(history).start(); - } + } - public void run() { + public void run() { - TwitterTimelineProvider provider = new TwitterTimelineProvider(config.getTwitter()); - ActivityConverterProcessor converter = new ActivityConverterProcessor(); - ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(config.getElasticsearch()); + TwitterTimelineProvider provider = new TwitterTimelineProvider(config.getTwitter()); + ActivityConverterProcessor converter = new ActivityConverterProcessor(); + ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(config.getElasticsearch()); - StreamBuilder builder = new LocalStreamBuilder(500); + LocalRuntimeConfiguration localRuntimeConfiguration = + StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class); + StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration); - builder.newPerpetualStream("provider", provider); - builder.addStreamsProcessor("converter", converter, 2, "provider"); - builder.addStreamsPersistWriter("writer", writer, 1, "converter"); - builder.start(); - } + builder.newPerpetualStream(TwitterTimelineProvider.class.getCanonicalName(), provider); + builder.addStreamsProcessor(ActivityConverterProcessor.class.getCanonicalName(), converter, 2, TwitterTimelineProvider.class.getCanonicalName()); + builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), writer, 1, ActivityConverterProcessor.class.getCanonicalName()); + builder.start(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java ---------------------------------------------------------------------- diff --git a/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java b/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java index 07c1d88..0eb022b 100644 --- a/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java +++ b/local/twitter-history-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterHistoryElasticsearchIT.java @@ -26,7 +26,6 @@ import org.apache.streams.example.TwitterHistoryElasticsearchConfiguration; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; - import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; @@ -45,62 +44,61 @@ import org.testng.annotations.Test; import java.io.File; -import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.AssertJUnit.assertTrue; /** - * Example stream that populates elasticsearch with activities from twitter userstream in real-time + * Example stream that populates elasticsearch with activities from twitter userstream in real-time. */ public class TwitterHistoryElasticsearchIT { - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearchIT.class); + private final static Logger LOGGER = LoggerFactory.getLogger(TwitterHistoryElasticsearchIT.class); - protected TwitterHistoryElasticsearchConfiguration testConfiguration; - protected Client testClient; + protected TwitterHistoryElasticsearchConfiguration testConfiguration; + protected Client testClient; - private int count = 0; + private int count = 0; - @BeforeClass - public void prepareTest() throws Exception { + @BeforeClass + public void prepareTest() throws Exception { - Config reference = ConfigFactory.load(); - File conf_file = new File("target/test-classes/TwitterHistoryElasticsearchIT.conf"); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - testConfiguration = new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(typesafe); - testClient = ElasticsearchClientManager.getInstance(testConfiguration.getElasticsearch()).client(); + Config reference = ConfigFactory.load(); + File conf_file = new File("target/test-classes/TwitterHistoryElasticsearchIT.conf"); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + testConfiguration = new ComponentConfigurator<>(TwitterHistoryElasticsearchConfiguration.class).detectConfiguration(typesafe); + testClient = ElasticsearchClientManager.getInstance(testConfiguration.getElasticsearch()).client(); - ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); - ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); - assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); + ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); + ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); + assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); - IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex()); - IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); - if(indicesExistsResponse.isExists()) { - DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getElasticsearch().getIndex()); - DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet(); - assertTrue(deleteIndexResponse.isAcknowledged()); - }; - } + IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex()); + IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); + if(indicesExistsResponse.isExists()) { + DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getElasticsearch().getIndex()); + DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet(); + assertTrue(deleteIndexResponse.isAcknowledged()); + }; + } - @Test - public void testTwitterHistoryElasticsearch() throws Exception { + @Test + public void testTwitterHistoryElasticsearch() throws Exception { - TwitterHistoryElasticsearch stream = new TwitterHistoryElasticsearch(testConfiguration); + TwitterHistoryElasticsearch stream = new TwitterHistoryElasticsearch(testConfiguration); - stream.run(); + stream.run(); - // assert lines in file - SearchRequestBuilder countRequest = testClient - .prepareSearch(testConfiguration.getElasticsearch().getIndex()) - .setTypes(testConfiguration.getElasticsearch().getType()); - SearchResponse countResponse = countRequest.execute().actionGet(); + // assert lines in file + SearchRequestBuilder countRequest = testClient + .prepareSearch(testConfiguration.getElasticsearch().getIndex()) + .setTypes(testConfiguration.getElasticsearch().getType()); + SearchResponse countResponse = countRequest.execute().actionGet(); - count = (int)countResponse.getHits().getTotalHits(); + count = (int)countResponse.getHits().getTotalHits(); - assertNotEquals(count, 0); - } + assertNotEquals(count, 0); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/twitter-history-elasticsearch/src/test/resources/TwitterHistoryElasticsearchIT.conf ---------------------------------------------------------------------- diff --git a/local/twitter-history-elasticsearch/src/test/resources/TwitterHistoryElasticsearchIT.conf b/local/twitter-history-elasticsearch/src/test/resources/TwitterHistoryElasticsearchIT.conf index 1a05e32..81e4903 100644 --- a/local/twitter-history-elasticsearch/src/test/resources/TwitterHistoryElasticsearchIT.conf +++ b/local/twitter-history-elasticsearch/src/test/resources/TwitterHistoryElasticsearchIT.conf @@ -26,4 +26,4 @@ elasticsearch { index = twitter_history_elasticsearch_it type = activity forceUseConfig = true -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java ---------------------------------------------------------------------- diff --git a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java index f1e776a..369ec0b 100644 --- a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java +++ b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/example/TwitterUserstreamElasticsearch.java @@ -18,129 +18,129 @@ package org.apache.streams.example; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.converter.ActivityConverterProcessor; +import org.apache.streams.core.StreamBuilder; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.elasticsearch.ElasticsearchPersistDeleter; import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration; -import org.apache.streams.example.TwitterUserstreamElasticsearchConfiguration; import org.apache.streams.filters.VerbDefinitionDropFilter; import org.apache.streams.filters.VerbDefinitionKeepFilter; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.local.LocalRuntimeConfiguration; import org.apache.streams.local.builders.LocalStreamBuilder; -import org.apache.streams.core.StreamBuilder; import org.apache.streams.pojo.json.Activity; import org.apache.streams.twitter.TwitterStreamConfiguration; import org.apache.streams.twitter.provider.TwitterStreamProvider; import org.apache.streams.verbs.ObjectCombination; import org.apache.streams.verbs.VerbDefinition; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.elasticsearch.common.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; -import java.util.Map; /** * Example stream that populates elasticsearch with activities from twitter userstream in real-time */ public class TwitterUserstreamElasticsearch implements Runnable { - public final static String STREAMS_ID = "TwitterUserstreamElasticsearch"; + public final static String STREAMS_ID = "TwitterUserstreamElasticsearch"; - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearch.class); + private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearch.class); - /* this pattern will match any/only deletes */ - private static VerbDefinition deleteVerbDefinition = - new VerbDefinition() - .withValue("delete") - .withObjects(Lists.newArrayList(new ObjectCombination())); + /* this pattern will match any/only deletes */ + private static VerbDefinition deleteVerbDefinition = + new VerbDefinition() + .withValue("delete") + .withObjects(Lists.newArrayList(new ObjectCombination())); - TwitterUserstreamElasticsearchConfiguration config; + TwitterUserstreamElasticsearchConfiguration config; - public TwitterUserstreamElasticsearch() { - this(new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); + public TwitterUserstreamElasticsearch() { + this(new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig())); - } + } - public TwitterUserstreamElasticsearch(TwitterUserstreamElasticsearchConfiguration config) { - this.config = config; - } + public TwitterUserstreamElasticsearch(TwitterUserstreamElasticsearchConfiguration config) { + this.config = config; + } - public static void main(String[] args) - { - LOGGER.info(StreamsConfigurator.config.toString()); + public static void main(String[] args) + { + LOGGER.info(StreamsConfigurator.config.toString()); - TwitterUserstreamElasticsearch userstream = new TwitterUserstreamElasticsearch(); - new Thread(userstream).start(); + TwitterUserstreamElasticsearch userstream = new TwitterUserstreamElasticsearch(); + new Thread(userstream).start(); - } + } - @Override - public void run() { + @Override + public void run() { - TwitterStreamConfiguration twitterStreamConfiguration = config.getTwitter(); - ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = config.getElasticsearch(); + TwitterStreamConfiguration twitterStreamConfiguration = config.getTwitter(); + ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = config.getElasticsearch(); - TwitterStreamProvider stream = new TwitterStreamProvider(twitterStreamConfiguration); - ActivityConverterProcessor converter = new ActivityConverterProcessor(); - VerbDefinitionDropFilter noDeletesProcessor = new VerbDefinitionDropFilter(Sets.newHashSet(deleteVerbDefinition)); - ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(elasticsearchWriterConfiguration); - VerbDefinitionKeepFilter deleteOnlyProcessor = new VerbDefinitionKeepFilter(Sets.newHashSet(deleteVerbDefinition)); - SetDeleteIdProcessor setDeleteIdProcessor = new SetDeleteIdProcessor(); - ElasticsearchPersistDeleter deleter = new ElasticsearchPersistDeleter(elasticsearchWriterConfiguration); + TwitterStreamProvider stream = new TwitterStreamProvider(twitterStreamConfiguration); + ActivityConverterProcessor converter = new ActivityConverterProcessor(); + VerbDefinitionDropFilter noDeletesProcessor = new VerbDefinitionDropFilter(Sets.newHashSet(deleteVerbDefinition)); + ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(elasticsearchWriterConfiguration); + VerbDefinitionKeepFilter deleteOnlyProcessor = new VerbDefinitionKeepFilter(Sets.newHashSet(deleteVerbDefinition)); + SetDeleteIdProcessor setDeleteIdProcessor = new SetDeleteIdProcessor(); + ElasticsearchPersistDeleter deleter = new ElasticsearchPersistDeleter(elasticsearchWriterConfiguration); - Map streamConfig = Maps.newHashMap(); - streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 12 * 60 * 1000); - StreamBuilder builder = new LocalStreamBuilder(25, streamConfig); + LocalRuntimeConfiguration localRuntimeConfiguration = + StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class); + StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration); - builder.newPerpetualStream(TwitterStreamProvider.STREAMS_ID, stream); - builder.addStreamsProcessor("converter", converter, 2, TwitterStreamProvider.STREAMS_ID); - builder.addStreamsProcessor("NoDeletesProcessor", noDeletesProcessor, 1, "converter"); - builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, writer, 1, "NoDeletesProcessor"); - builder.addStreamsProcessor("DeleteOnlyProcessor", deleteOnlyProcessor, 1, "converter"); - builder.addStreamsProcessor("SetDeleteIdProcessor", setDeleteIdProcessor, 1, "DeleteOnlyProcessor"); - builder.addStreamsPersistWriter("deleter", deleter, 1, "SetDeleteIdProcessor"); + builder.newPerpetualStream(TwitterStreamProvider.class.getCanonicalName(), stream); + builder.addStreamsProcessor(ActivityConverterProcessor.class.getCanonicalName(), converter, 2, TwitterStreamProvider.class.getCanonicalName()); + builder.addStreamsProcessor(VerbDefinitionDropFilter.class.getCanonicalName(), noDeletesProcessor, 1, ActivityConverterProcessor.class.getCanonicalName()); + builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), writer, 1, VerbDefinitionDropFilter.class.getCanonicalName()); + builder.addStreamsProcessor(VerbDefinitionKeepFilter.class.getCanonicalName(), deleteOnlyProcessor, 1, ActivityConverterProcessor.class.getCanonicalName()); + builder.addStreamsProcessor(SetDeleteIdProcessor.class.getCanonicalName(), setDeleteIdProcessor, 1, VerbDefinitionKeepFilter.class.getCanonicalName()); + builder.addStreamsPersistWriter(ElasticsearchPersistDeleter.class.getCanonicalName(), deleter, 1, SetDeleteIdProcessor.class.getCanonicalName()); - builder.start(); + builder.start(); - } + } - protected class SetDeleteIdProcessor implements StreamsProcessor { + protected class SetDeleteIdProcessor implements StreamsProcessor { - public String getId() { - return "TwitterUserstreamElasticsearch.SetDeleteIdProcessor"; - } + public String getId() { + return "TwitterUserstreamElasticsearch.SetDeleteIdProcessor"; + } - @Override - public List process(StreamsDatum entry) { + @Override + public List process(StreamsDatum entry) { - Preconditions.checkArgument(entry.getDocument() instanceof Activity); - String id = entry.getId(); - // replace delete with post in id - // ensure ElasticsearchPersistDeleter will remove original post if present - id = Strings.replace(id, "delete", "post"); - entry.setId(id); + Preconditions.checkArgument(entry.getDocument() instanceof Activity); + String id = entry.getId(); + // replace delete with post in id + // ensure ElasticsearchPersistDeleter will remove original post if present + id = Strings.replace(id, "delete", "post"); + entry.setId(id); - return Lists.newArrayList(entry); - } + return Lists.newArrayList(entry); + } - @Override - public void prepare(Object configurationObject) { + @Override + public void prepare(Object configurationObject) { - } + } - @Override - public void cleanUp() { + @Override + public void cleanUp() { - } } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java ---------------------------------------------------------------------- diff --git a/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java index 2fd26db..63dd8de 100644 --- a/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java +++ b/local/twitter-userstream-elasticsearch/src/test/java/org/apache/streams/example/test/TwitterUserstreamElasticsearchIT.java @@ -26,7 +26,6 @@ import org.apache.streams.example.TwitterUserstreamElasticsearchConfiguration; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; - import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; @@ -45,7 +44,6 @@ import org.testng.annotations.Test; import java.io.File; -import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.AssertJUnit.assertTrue; @@ -54,55 +52,55 @@ import static org.testng.AssertJUnit.assertTrue; */ public class TwitterUserstreamElasticsearchIT { - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearchIT.class); + private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearchIT.class); - protected TwitterUserstreamElasticsearchConfiguration testConfiguration; - protected Client testClient; + protected TwitterUserstreamElasticsearchConfiguration testConfiguration; + protected Client testClient; - private int count = 0; + private int count = 0; - @BeforeClass - public void prepareTest() throws Exception { + @BeforeClass + public void prepareTest() throws Exception { - Config reference = ConfigFactory.load(); - File conf_file = new File("target/test-classes/TwitterUserstreamElasticsearchIT.conf"); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - testConfiguration = new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(typesafe); - testClient = ElasticsearchClientManager.getInstance(testConfiguration.getElasticsearch()).client(); + Config reference = ConfigFactory.load(); + File conf_file = new File("target/test-classes/TwitterUserstreamElasticsearchIT.conf"); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + testConfiguration = new ComponentConfigurator<>(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(typesafe); + testClient = ElasticsearchClientManager.getInstance(testConfiguration.getElasticsearch()).client(); - ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); - ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); - assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); + ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); + ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); + assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED); - IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex()); - IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); - if(indicesExistsResponse.isExists()) { - DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getElasticsearch().getIndex()); - DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet(); - assertTrue(deleteIndexResponse.isAcknowledged()); - }; + IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getElasticsearch().getIndex()); + IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); + if(indicesExistsResponse.isExists()) { + DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getElasticsearch().getIndex()); + DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet(); + assertTrue(deleteIndexResponse.isAcknowledged()); + }; - } + } - @Test - public void testUserstreamElasticsearch() throws Exception { + @Test + public void testUserstreamElasticsearch() throws Exception { - TwitterUserstreamElasticsearch stream = new TwitterUserstreamElasticsearch(testConfiguration); + TwitterUserstreamElasticsearch stream = new TwitterUserstreamElasticsearch(testConfiguration); - Thread thread = new Thread(stream); - thread.start(); - thread.join(30000); + Thread thread = new Thread(stream); + thread.start(); + thread.join(30000); - // assert lines in file - SearchRequestBuilder countRequest = testClient - .prepareSearch(testConfiguration.getElasticsearch().getIndex()) - .setTypes(testConfiguration.getElasticsearch().getType()); - SearchResponse countResponse = countRequest.execute().actionGet(); + // assert lines in file + SearchRequestBuilder countRequest = testClient + .prepareSearch(testConfiguration.getElasticsearch().getIndex()) + .setTypes(testConfiguration.getElasticsearch().getType()); + SearchResponse countResponse = countRequest.execute().actionGet(); - count = (int)countResponse.getHits().getTotalHits(); + count = (int)countResponse.getHits().getTotalHits(); - assertNotEquals(count, 0); - } + assertNotEquals(count, 0); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf ---------------------------------------------------------------------- diff --git a/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf b/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf index df9be4d..bca2d51 100644 --- a/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf +++ b/local/twitter-userstream-elasticsearch/src/test/resources/TwitterUserstreamElasticsearchIT.conf @@ -26,4 +26,5 @@ elasticsearch { index = twitter_userstream_elasticsearch_it type = activity forceUseConfig = true -} \ No newline at end of file +} +taskTimeoutMs = 60000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index e369e36..384d71a 100644 --- a/pom.xml +++ b/pom.xml @@ -263,10 +263,88 @@ + + + maven-checkstyle-plugin + + + org.scalastyle + scalastyle-maven-plugin + + + maven-compiler-plugin + + + maven-site-plugin + + + maven-enforcer-plugin + + + maven-resources-plugin + + + maven-remote-resources-plugin + + org.apache.maven.plugins + maven-checkstyle-plugin + ${checkstyle.plugin.version} + + + com.puppycrawl.tools + checkstyle + 7.2 + + + + + validate + validate + + http://streams.incubator.apache.org/site/${project.version}/streams-master/streams-java-checkstyle.xml + UTF-8 + true + true + ${project.basedir}/src/test/java + false + + + check + + + + + + org.scalastyle + scalastyle-maven-plugin + ${scalastyle.plugin.version} + + + validate + validate + + false + false + true + false + ${project.basedir}/src/main/scala + ${project.basedir}/src/test/scala + + ${project.build.directory}/scalastyle-output.xml + UTF-8 + + + check + + + + + + org.apache.maven.plugins maven-shade-plugin ${shade.plugin.version}