ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: Ignite-530 Pull request fixes.
Date Wed, 04 Nov 2015 15:34:17 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-530-pull-fixes [created] 8f178a05b


Ignite-530 Pull request fixes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8f178a05
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8f178a05
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8f178a05

Branch: refs/heads/ignite-530-pull-fixes
Commit: 8f178a05bd1eaad94fb83b2f7771f3b26b6d7f99
Parents: 0cfb32f
Author: Anton Vinogradov <av@apache.org>
Authored: Wed Nov 4 18:33:50 2015 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Wed Nov 4 18:33:50 2015 +0300

----------------------------------------------------------------------
 modules/twitter/pom.xml                         | 116 +++++++
 .../ignite/stream/twitter/OAuthSettings.java    |  86 +++++
 .../ignite/stream/twitter/TweetTransformer.java |  37 +++
 .../ignite/stream/twitter/TwitterStreamer.java  | 317 +++++++++++++++++++
 .../twitter/IgniteTwitterStreamerTest.java      | 221 +++++++++++++
 .../twitter/IgniteTwitterStreamerTestSuite.java |  32 ++
 .../stream/twitter/TwitterStreamerImpl.java     |  77 +++++
 pom.xml                                         |   1 +
 8 files changed, 887 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8f178a05/modules/twitter/pom.xml
----------------------------------------------------------------------
diff --git a/modules/twitter/pom.xml b/modules/twitter/pom.xml
new file mode 100644
index 0000000..21e6f51
--- /dev/null
+++ b/modules/twitter/pom.xml
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-parent</artifactId>
+        <version>1</version>
+        <relativePath>../../parent</relativePath>
+    </parent>
+
+    <artifactId>ignite-twitter</artifactId>
+    <version>1.5.0-SNAPSHOT</version>
+    <url>http://ignite.apache.org</url>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-log4j</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-spring</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.twitter</groupId>
+            <artifactId>hbc-twitter4j</artifactId>
+            <version>2.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.tomakehurst</groupId>
+            <artifactId>wiremock</artifactId>
+            <version>1.57</version>
+            <classifier>standalone</classifier>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jetty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-annotations</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.httpcomponents</groupId>
+                    <artifactId>httpclient</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.skyscreamer</groupId>
+                    <artifactId>jsonassert</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>xmlunit</groupId>
+                    <artifactId>xmlunit</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.jayway.jsonpath</groupId>
+                    <artifactId>json-path</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>net.sf.jopt-simple</groupId>
+                    <artifactId>jopt-simple</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f178a05/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/OAuthSettings.java
----------------------------------------------------------------------
diff --git a/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/OAuthSettings.java
b/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/OAuthSettings.java
new file mode 100644
index 0000000..c2e46a5
--- /dev/null
+++ b/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/OAuthSettings.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.twitter;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * OAuth keys holder.
+ */
+public class OAuthSettings {
+    /** */
+    private final String consumerKey;
+
+    /** */
+    private final String consumerSecret;
+
+    /** */
+    private final String accessToken;
+
+    /** */
+    private final String accessTokenSecret;
+
+    /**
+     * @param consumerKey Consumer key.
+     * @param consumerSecret Consumer secret.
+     * @param accessToken Access token.
+     * @param accessTokenSecret Access secret token.
+     */
+    public OAuthSettings(
+        @NotNull String consumerKey,
+        @NotNull String consumerSecret,
+        @NotNull String accessToken,
+        @NotNull String accessTokenSecret) {
+        this.consumerKey = consumerKey;
+        this.consumerSecret = consumerSecret;
+        this.accessToken = accessToken;
+        this.accessTokenSecret = accessTokenSecret;
+    }
+
+    /**
+     * @return Consumer key.
+     */
+    @NotNull
+    public String getConsumerKey() {
+        return consumerKey;
+    }
+
+    /**
+     * @return Consumer secret.
+     */
+    @NotNull
+    public String getConsumerSecret() {
+        return consumerSecret;
+    }
+
+    /**
+     * @return Access token.
+     */
+    @NotNull
+    public String getAccessToken() {
+        return accessToken;
+    }
+
+    /**
+     * @return Access token secret.
+     */
+    @NotNull
+    public String getAccessTokenSecret() {
+        return accessTokenSecret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f178a05/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TweetTransformer.java
----------------------------------------------------------------------
diff --git a/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TweetTransformer.java
b/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TweetTransformer.java
new file mode 100644
index 0000000..a2f4c02
--- /dev/null
+++ b/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TweetTransformer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.twitter;
+
+import java.util.Map;
+
+/**
+ * Implement this interface to transform from a Tweet JSON String to a set of cache entries
in the form of a {@link
+ * Map}.
+ *
+ * @param <K> The type of the cache key.
+ * @param <V> The type of the cache value.
+ */
+public interface TweetTransformer<K, V> {
+    /**
+     * Transformation function.
+     *
+     * @param tweet The message (Tweet JSON String) received from the Twitter Streaming API.
+     * @return Set of cache entries to add to the cache. It could be empty or null if the
message should be skipped.
+     */
+    Map<K, V> apply(String tweet);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f178a05/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TwitterStreamer.java
----------------------------------------------------------------------
diff --git a/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TwitterStreamer.java
b/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TwitterStreamer.java
new file mode 100644
index 0000000..781cfca
--- /dev/null
+++ b/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TwitterStreamer.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.twitter;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.twitter.hbc.ClientBuilder;
+import com.twitter.hbc.core.Client;
+import com.twitter.hbc.core.HttpConstants;
+import com.twitter.hbc.core.HttpHosts;
+import com.twitter.hbc.core.endpoint.DefaultStreamingEndpoint;
+import com.twitter.hbc.core.endpoint.SitestreamEndpoint;
+import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
+import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
+import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
+import com.twitter.hbc.core.endpoint.StreamingEndpoint;
+import com.twitter.hbc.core.endpoint.UserstreamEndpoint;
+import com.twitter.hbc.core.processor.StringDelimitedProcessor;
+import com.twitter.hbc.httpclient.auth.Authentication;
+import com.twitter.hbc.httpclient.auth.OAuth1;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.stream.StreamAdapter;
+
+/**
+ * Streamer that consumes from a Twitter Streaming API and feeds transformed key-value pairs,
+ * by default <tweetId, text>, into an {@link IgniteDataStreamer} instance.
+ * <p>
+ * This streamer uses https://dev.twitter.com/streaming API and supports Public API, User
Streams,
+ * Site Streams and Firehose.
+ * <p>
+ * You can also provide a {@link TweetTransformer} to convert the incoming message into cache
entries to override
+ * default transformer.
+ * <p>
+ * This Streamer features:
+ * <ul>
+ *     <li>Supports OAuth1 authentication scheme.
+ *     <br/> BasicAuth not supported by Streaming API https://dev.twitter.com/streaming/overview/connecting</li>
+ *     <li>Provide all params in apiParams map. https://dev.twitter.com/streaming/overview/request-parameters</li>
+ * </ul>
+ */
+public abstract class TwitterStreamer<K, V> extends StreamAdapter<String, K, V>
{
+    /** Logger. */
+    protected IgniteLogger log;
+
+    /**
+     * Threads count used to transform tweets.
+     */
+    private int threadsCount = 1;
+
+    /**
+     * The message transformer that converts an incoming Tweet into cache entries. If not
provided default transformer
+     * will be used.
+     */
+    private TweetTransformer<K, V> transformer;
+
+    /** Twitter Streaming API params. See https://dev.twitter.com/streaming/overview/request-parameters
*/
+    private Map<String, String> apiParams;
+
+    /** Twitter streaming API endpoint example, '/statuses/filter.json' or '/statuses/firehose.json'
*/
+    private String endpointUrl;
+
+    /** OAuth params holder */
+    private OAuthSettings oAuthSettings;
+
+    /** shared variable to communicate/signal that streamer is already running or can be
started */
+    private final AtomicInteger running = new AtomicInteger();
+
+    /**
+     * Size of buffer for streaming, as for some tracking terms traffic can be low and for
others high, this is
+     * configurable
+     */
+    private Integer bufferCapacity = 100000;
+
+    /** Twitter streaming client (Twitter HBC) to interact with stream */
+    private Client client;
+
+    /** Process stream asynchronously */
+    private ExecutorService tweetStreamProcessor;
+
+    /** Param key name constant for Site streaming */
+    private final String SITE_USER_ID_KEY = "follow";
+
+    /**
+     * @param oAuthSettings OAuth Settings
+     */
+    public TwitterStreamer(OAuthSettings oAuthSettings) {
+        this.oAuthSettings = oAuthSettings;
+    }
+
+    /**
+     * Starts streamer.
+     */
+    public void start() {
+        if (!running.compareAndSet(0, 1))
+            throw new IgniteException("Attempted to start an already started Twitter Streamer");
+
+        validateConfig();
+
+        log = getIgnite().log();
+
+        final BlockingQueue<String> tweetQueue = new LinkedBlockingQueue<>(bufferCapacity);
+
+        client = getClient(tweetQueue);
+
+        client.connect();
+
+        tweetStreamProcessor = Executors.newFixedThreadPool(threadsCount);
+
+        for (int i = 0; i < threadsCount; i++) {
+            Callable<Boolean> task = new Callable<Boolean>() {
+
+                @Override
+                public Boolean call() {
+                    while (!client.isDone() && running.get() == 1) {
+                        try {
+                            String tweet = tweetQueue.take();
+
+                            Map<K, V> value = transformer.apply(tweet);
+
+                            if (value != null)
+                                getStreamer().addData(value);
+                        }
+                        catch (InterruptedException e) {
+                            log.error("Tweets transformation was interrupted", e);
+                        }
+                    }
+
+                    return true;
+                }
+            };
+
+            tweetStreamProcessor.submit(task);
+        }
+    }
+
+    /**
+     * Stops streamer.
+     */
+    public void stop() {
+        if (running.get() == 0)
+            throw new IgniteException("Attempted to stop an already stopped Twitter Streamer");
+
+        tweetStreamProcessor.shutdownNow();
+
+        client.stop();
+
+        running.compareAndSet(1, 0);
+    }
+
+    /**
+     * Validates config at start.
+     */
+    protected void validateConfig() {
+        A.notNull(getStreamer(), "streamer");
+        A.notNull(getIgnite(), "ignite");
+        A.notNull(endpointUrl, "Twitter Streaming API endpoint");
+        A.notNull(transformer, "Transformer");
+
+        String followParam = apiParams.get(SITE_USER_ID_KEY);
+
+        A.ensure(followParam != null && followParam.matches("^(\\d+,? ?)+$"),
+            "Site streaming endpoint must provide 'follow' param with value as comma separated
numbers");
+    }
+
+    /**
+     * @param tweetQueue Tweet queue.
+     * @return Client.
+     */
+    protected Client getClient(BlockingQueue<String> tweetQueue) {
+        StreamingEndpoint endpoint;
+
+        HttpHosts hosts;
+
+        switch (endpointUrl.toLowerCase()) {
+            case StatusesFilterEndpoint.PATH:
+                endpoint = new StatusesFilterEndpoint();
+
+                hosts = HttpHosts.STREAM_HOST;
+
+                break;
+            case StatusesFirehoseEndpoint.PATH:
+                endpoint = new StatusesFirehoseEndpoint();
+
+                hosts = HttpHosts.STREAM_HOST;
+
+                break;
+            case StatusesSampleEndpoint.PATH:
+                endpoint = new StatusesSampleEndpoint();
+
+                hosts = HttpHosts.STREAM_HOST;
+
+                break;
+            case UserstreamEndpoint.PATH:
+                endpoint = new UserstreamEndpoint();
+
+                hosts = HttpHosts.USERSTREAM_HOST;
+
+                break;
+            case SitestreamEndpoint.PATH:
+                String follow = apiParams.remove(SITE_USER_ID_KEY);
+
+                List<Long> followers = Lists.newArrayList();
+
+                for (String follower : Splitter.on(',').trimResults().omitEmptyStrings().split(follow))
{
+                    followers.add(Long.valueOf(follower));
+                }
+
+                endpoint = new SitestreamEndpoint(followers);
+
+                hosts = HttpHosts.SITESTREAM_HOST;
+
+                break;
+            default:
+                endpoint = new DefaultStreamingEndpoint(endpointUrl, HttpConstants.HTTP_GET,
false);
+
+                hosts = HttpHosts.STREAM_HOST;
+
+        }
+
+        for (Map.Entry<String, String> entry : apiParams.entrySet()) {
+            endpoint.addPostParameter(entry.getKey(), entry.getValue());
+        }
+
+        return buildClient(tweetQueue, hosts, endpoint);
+    }
+
+    /**
+     * @param tweetQueue tweet Queue.
+     * @param hosts Hostes.
+     * @param endpoint Endpoint.
+     * @return Client.
+     */
+    protected Client buildClient(BlockingQueue<String> tweetQueue, HttpHosts hosts,
StreamingEndpoint endpoint) {
+        Authentication authentication = new OAuth1(oAuthSettings.getConsumerKey(), oAuthSettings.getConsumerSecret(),
+            oAuthSettings.getAccessToken(), oAuthSettings.getAccessTokenSecret());
+
+        ClientBuilder builder = new ClientBuilder()
+            .name("Ignite-Twitter-Client")
+            .hosts(hosts)
+            .authentication(authentication)
+            .endpoint(endpoint)
+            .processor(new StringDelimitedProcessor(tweetQueue));
+
+        return builder.build();
+    }
+
+    /**
+     * Sets Transformer.
+     *
+     * @param transformer Transformer.
+     */
+    public void setTransformer(TweetTransformer<K, V> transformer) {
+        this.transformer = transformer;
+    }
+
+    /**
+     * Sets API Params.
+     *
+     * @param apiParams API Params.
+     */
+    public void setApiParams(Map<String, String> apiParams) {
+        this.apiParams = apiParams;
+    }
+
+    /**
+     * Sets Endpoint URL.
+     *
+     * @param endpointUrl Endpoint URL.
+     */
+    public void setEndpointUrl(String endpointUrl) {
+        this.endpointUrl = endpointUrl;
+    }
+
+    /**
+     * Sets Buffer capacity.
+     *
+     * @param bufferCapacity Buffer capacity.
+     */
+    public void setBufferCapacity(Integer bufferCapacity) {
+        this.bufferCapacity = bufferCapacity;
+    }
+
+    /**
+     * Sets Threads count.
+     *
+     * @param threadsCount Threads count.
+     */
+    public void setThreadsCount(int threadsCount) {
+        this.threadsCount = threadsCount;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f178a05/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/IgniteTwitterStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/IgniteTwitterStreamerTest.java
b/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/IgniteTwitterStreamerTest.java
new file mode 100644
index 0000000..f341e22
--- /dev/null
+++ b/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/IgniteTwitterStreamerTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.twitter;
+
+import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import com.twitter.hbc.core.HttpHosts;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Rule;
+import twitter4j.Status;
+import twitter4j.TwitterException;
+import twitter4j.TwitterObjectFactory;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+
+/**
+ * Test for {@link TwitterStreamer}. Tests Public Status streaming API https://dev.twitter.com/streaming/public.
+ */
+public class IgniteTwitterStreamerTest extends GridCommonAbstractTest {
+
+    /** Cache entries count. */
+    private static final int CACHE_ENTRY_COUNT = 100;
+
+    /** Mocked api in embedded server. */
+    private static final String MOCK_TWEET_PATH = "/tweet/mock";
+
+    /** Sample tweet. */
+    private static final String tweet = "{\"id\":647375831971590144,\"text\":\"sample tweet
to test streamer\"}\n";
+
+    /** Constructor. */
+    public IgniteTwitterStreamerTest() {
+        super(true);
+    }
+
+    /** Embedded mock HTTP server's for Twitter API rule. */
+    @Rule
+    public final WireMockRule wireMockRule = new WireMockRule();
+
+    /** Embedded mock HTTP server for Twitter API. */
+    public final WireMockServer mockServer = new WireMockServer(); //Starts server on 8080
port.
+
+    /** {@inheritDoc} */
+    @Override public void beforeTest() throws Exception {
+        grid().getOrCreateCache(defaultCacheConfiguration());
+
+        mockServer.start();
+
+        stubFor(get(urlMatching("/1.1" + MOCK_TWEET_PATH + ".*")).willReturn(aResponse().
+            withHeader("Content-Type", "text/plain").withBody(tweet.length() + "\n" + tweet)));
+    }
+
+    /** {@inheritDoc} */
+    public void afterTest() throws Exception {
+        stopAllGrids();
+
+        mockServer.stop();
+    }
+
+    /**
+     * @throws Exception Test exception.
+     */
+    public void testStatusesFilterEndpointOAuth1() throws Exception {
+        try (IgniteDataStreamer<Long, String> dataStreamer = grid().dataStreamer(null))
{
+            TwitterStreamerImpl streamer = newStreamerInstance(dataStreamer);
+
+            Map<String, String> params = new HashMap<>();
+
+            params.put("track", "apache, twitter");
+            params.put("follow", "3004445758");//@ApacheIgnite id.
+
+            streamer.setApiParams(params);
+            streamer.setEndpointUrl(MOCK_TWEET_PATH);
+            streamer.setHosts(new HttpHosts("http://localhost:8080"));
+            streamer.setThreadsCount(8);
+
+            executeStreamer(streamer);
+        }
+    }
+
+    /**
+     * @param streamer Twitter streamer.
+     * @throws InterruptedException Test exception.
+     * @throws TwitterException Test exception.
+     */
+    private void executeStreamer(TwitterStreamer streamer) throws InterruptedException, TwitterException
{
+        //Checking streaming.
+
+        CacheListener listener = subscribeToPutEvents();
+
+        streamer.start();
+
+        try{
+            streamer.start();
+
+            A.ensure(false, "Streamer concurrent start allowed instead of denied");
+        }catch (IgniteException ex){
+            //No-op
+        }
+
+        CountDownLatch latch = listener.getLatch();
+
+        //Enough tweets was handled in 10 seconds.
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+        unsubscribeToPutEvents(listener);
+
+        streamer.stop();
+
+        //Checking cache content after streaming finished.
+
+        Status status = TwitterObjectFactory.createStatus(tweet);
+
+        IgniteCache<Long, String> cache = grid().cache(null);
+
+        String cachedValue = cache.get(status.getId());
+
+        //Tweet successfully put to cache.
+        assertTrue(cachedValue != null && cachedValue.equals(status.getText()));
+
+        //Same tweets does not produce duplicate entries.
+        assertTrue(cache.size() == 1);
+    }
+
+    /**
+     * @return Cache listener.
+     */
+    private CacheListener subscribeToPutEvents() {
+        Ignite ignite = grid();
+
+        // Listen to cache PUT events and expect as many as messages as test data items
+        CacheListener listener = new CacheListener();
+
+        ignite.events(ignite.cluster().forCacheNodes(null)).localListen(listener, EVT_CACHE_OBJECT_PUT);
+
+        return listener;
+    }
+
+    /**
+     * @param listener Cache listener.
+     */
+    private void unsubscribeToPutEvents(CacheListener listener) {
+        Ignite ignite = grid();
+
+        ignite.events(ignite.cluster().forCacheNodes(null)).stopLocalListen(listener, EVT_CACHE_OBJECT_PUT);
+    }
+
+    /**
+     * @param dataStreamer Ignite Data Streamer.
+     * @return Twitter Streamer.
+     */
+    private TwitterStreamerImpl newStreamerInstance(IgniteDataStreamer<Long, String>
dataStreamer) {
+        OAuthSettings oAuthSettings = new OAuthSettings("<dummy>", "<dummy>",
"<dummy>", "<dummy>");
+
+        TwitterStreamerImpl streamer = new TwitterStreamerImpl(oAuthSettings);
+
+        streamer.setIgnite(grid());
+        streamer.setStreamer(dataStreamer);
+
+        dataStreamer.allowOverwrite(true);
+        dataStreamer.autoFlushFrequency(10);
+
+        return streamer;
+    }
+
+    /**
+     * Listener.
+     */
+    private class CacheListener implements IgnitePredicate<CacheEvent> {
+
+        /** */
+        private final CountDownLatch latch = new CountDownLatch(CACHE_ENTRY_COUNT);
+
+        /**
+         * @return Latch.
+         */
+        public CountDownLatch getLatch() {
+            return latch;
+        }
+
+        /**
+         * @param evt Cache Event.
+         * @return {@code true}.
+         */
+        @Override
+        public boolean apply(CacheEvent evt) {
+            latch.countDown();
+
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f178a05/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/IgniteTwitterStreamerTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/IgniteTwitterStreamerTestSuite.java
b/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/IgniteTwitterStreamerTestSuite.java
new file mode 100644
index 0000000..b458bed
--- /dev/null
+++ b/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/IgniteTwitterStreamerTestSuite.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.twitter;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Twitter streamer tests.
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    IgniteTwitterStreamerTest.class
+})
+public class IgniteTwitterStreamerTestSuite {
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f178a05/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/TwitterStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/TwitterStreamerImpl.java
b/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/TwitterStreamerImpl.java
new file mode 100644
index 0000000..0535d06
--- /dev/null
+++ b/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/TwitterStreamerImpl.java
@@ -0,0 +1,77 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.stream.twitter;
+
+import com.twitter.hbc.core.Client;
+import com.twitter.hbc.core.HttpHosts;
+import com.twitter.hbc.core.endpoint.StreamingEndpoint;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import twitter4j.Status;
+import twitter4j.TwitterException;
+import twitter4j.TwitterObjectFactory;
+
+/**
+ * Long, String implementation of TwitterStreamer.
+ */
+public class TwitterStreamerImpl extends TwitterStreamer<Long, String> {
+    /** Mocked server support. */
+    HttpHosts hosts;
+
+    /**
+     * @param oAuthSettings OAuth Settings
+     */
+    public TwitterStreamerImpl(OAuthSettings oAuthSettings) {
+        super(oAuthSettings);
+
+        setTransformer(new TweetTransformerImpl());
+    }
+
+    /**
+     * @param hosts hosts.
+     */
+    public void setHosts(HttpHosts hosts) {
+        this.hosts = hosts;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Client buildClient(BlockingQueue<String> tweetQueue, HttpHosts
hosts, StreamingEndpoint endpoint) {
+        return super.buildClient(tweetQueue, this.hosts, endpoint);
+    }
+
+    /**
+     * Long, String Tweet Transformer
+     */
+    private class TweetTransformerImpl implements TweetTransformer<Long, String> {
+        /** {@inheritDoc} */
+        @Override public Map<Long, String> apply(String tweet) {
+            try {
+                Status status = TwitterObjectFactory.createStatus(tweet);
+
+                return Collections.singletonMap(status.getId(), status.getText());
+            }
+            catch (TwitterException e) {
+                U.error(log, e);
+
+                return null;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f178a05/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5f06555..2bd290c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@
         <module>modules/kafka</module>
         <module>modules/yarn</module>
         <module>modules/jms11</module>
+        <module>modules/twitter</module>
         <module>modules/mqtt</module>
         <module>modules/zookeeper</module>
         <module>modules/platform</module>


Mime
View raw message