ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: Ignite-530 Pull request fixes. Fixes according to https://cwiki.apache.org/confluence/display/IGNITE/Streamers+Implementation+Guidelines.
Date Thu, 05 Nov 2015 12:59:16 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-530-pull-fixes 8f178a05b -> a0e707334


Ignite-530 Pull request fixes.
Fixes according to https://cwiki.apache.org/confluence/display/IGNITE/Streamers+Implementation+Guidelines.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a0e70733
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a0e70733
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a0e70733

Branch: refs/heads/ignite-530-pull-fixes
Commit: a0e70733434dead84d13e20bc4eebdf2ce696cae
Parents: 8f178a0
Author: Anton Vinogradov <av@apache.org>
Authored: Thu Nov 5 15:59:03 2015 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Thu Nov 5 15:59:03 2015 +0300

----------------------------------------------------------------------
 .../ignite/stream/twitter/TweetTransformer.java | 37 ------------------
 .../ignite/stream/twitter/TwitterStreamer.java  | 40 +++++---------------
 .../twitter/IgniteTwitterStreamerTest.java      |  7 +++-
 .../stream/twitter/TwitterStreamerImpl.java     | 18 +++++----
 4 files changed, 25 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a0e70733/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TweetTransformer.java
----------------------------------------------------------------------
diff --git a/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TweetTransformer.java
b/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TweetTransformer.java
deleted file mode 100644
index a2f4c02..0000000
--- a/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TweetTransformer.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.twitter;
-
-import java.util.Map;
-
-/**
- * Implement this interface to transform from a Tweet JSON String to a set of cache entries
in the form of a {@link
- * Map}.
- *
- * @param <K> The type of the cache key.
- * @param <V> The type of the cache value.
- */
-public interface TweetTransformer<K, V> {
-    /**
-     * Transformation function.
-     *
-     * @param tweet The message (Tweet JSON String) received from the Twitter Streaming API.
-     * @return Set of cache entries to add to the cache. It could be empty or null if the
message should be skipped.
-     */
-    Map<K, V> apply(String tweet);
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a0e70733/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TwitterStreamer.java
----------------------------------------------------------------------
diff --git a/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TwitterStreamer.java
b/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TwitterStreamer.java
index 781cfca..5b53db0 100644
--- a/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TwitterStreamer.java
+++ b/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TwitterStreamer.java
@@ -54,9 +54,6 @@ import org.apache.ignite.stream.StreamAdapter;
  * This streamer uses https://dev.twitter.com/streaming API and supports Public API, User
Streams,
  * Site Streams and Firehose.
  * <p>
- * You can also provide a {@link TweetTransformer} to convert the incoming message into cache
entries to override
- * default transformer.
- * <p>
  * This Streamer features:
  * <ul>
  *     <li>Supports OAuth1 authentication scheme.
@@ -68,17 +65,9 @@ public abstract class TwitterStreamer<K, V> extends StreamAdapter<String,
K, V>
     /** Logger. */
     protected IgniteLogger log;
 
-    /**
-     * Threads count used to transform tweets.
-     */
+    /** Threads count used to transform tweets. */
     private int threadsCount = 1;
 
-    /**
-     * The message transformer that converts an incoming Tweet into cache entries. If not
provided default transformer
-     * will be used.
-     */
-    private TweetTransformer<K, V> transformer;
-
     /** Twitter Streaming API params. See https://dev.twitter.com/streaming/overview/request-parameters
*/
     private Map<String, String> apiParams;
 
@@ -137,21 +126,18 @@ public abstract class TwitterStreamer<K, V> extends StreamAdapter<String,
K, V>
 
                 @Override
                 public Boolean call() {
-                    while (!client.isDone() && running.get() == 1) {
+                    while (true) {
                         try {
                             String tweet = tweetQueue.take();
 
-                            Map<K, V> value = transformer.apply(tweet);
-
-                            if (value != null)
-                                getStreamer().addData(value);
+                            addMessage(tweet);
                         }
                         catch (InterruptedException e) {
                             log.error("Tweets transformation was interrupted", e);
+
+                            return true;
                         }
                     }
-
-                    return true;
                 }
             };
 
@@ -177,10 +163,11 @@ public abstract class TwitterStreamer<K, V> extends StreamAdapter<String,
K, V>
      * Validates config at start.
      */
     protected void validateConfig() {
-        A.notNull(getStreamer(), "streamer");
-        A.notNull(getIgnite(), "ignite");
+        A.notNull(getStreamer(), "Streamer");
+        A.notNull(getIgnite(), "Ignite");
         A.notNull(endpointUrl, "Twitter Streaming API endpoint");
-        A.notNull(transformer, "Transformer");
+
+        A.ensure(getSingleTupleExtractor() != null || getMultipleTupleExtractor() != null,
"Twitter extractor");
 
         String followParam = apiParams.get(SITE_USER_ID_KEY);
 
@@ -271,15 +258,6 @@ public abstract class TwitterStreamer<K, V> extends StreamAdapter<String,
K, V>
     }
 
     /**
-     * Sets Transformer.
-     *
-     * @param transformer Transformer.
-     */
-    public void setTransformer(TweetTransformer<K, V> transformer) {
-        this.transformer = transformer;
-    }
-
-    /**
      * Sets API Params.
      *
      * @param apiParams API Params.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a0e70733/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/IgniteTwitterStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/IgniteTwitterStreamerTest.java
b/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/IgniteTwitterStreamerTest.java
index f341e22..33633a4 100644
--- a/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/IgniteTwitterStreamerTest.java
+++ b/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/IgniteTwitterStreamerTest.java
@@ -70,6 +70,11 @@ public class IgniteTwitterStreamerTest extends GridCommonAbstractTest {
     public final WireMockServer mockServer = new WireMockServer(); //Starts server on 8080
port.
 
     /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10_000;
+    }
+
+    /** {@inheritDoc} */
     @Override public void beforeTest() throws Exception {
         grid().getOrCreateCache(defaultCacheConfiguration());
 
@@ -130,7 +135,7 @@ public class IgniteTwitterStreamerTest extends GridCommonAbstractTest
{
         CountDownLatch latch = listener.getLatch();
 
         //Enough tweets was handled in 10 seconds.
-        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        latch.await();
 
         unsubscribeToPutEvents(listener);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a0e70733/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/TwitterStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/TwitterStreamerImpl.java
b/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/TwitterStreamerImpl.java
index 0535d06..8ec325c 100644
--- a/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/TwitterStreamerImpl.java
+++ b/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/TwitterStreamerImpl.java
@@ -20,10 +20,11 @@ package org.apache.ignite.stream.twitter;
 import com.twitter.hbc.core.Client;
 import com.twitter.hbc.core.HttpHosts;
 import com.twitter.hbc.core.endpoint.StreamingEndpoint;
-import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
 import twitter4j.Status;
 import twitter4j.TwitterException;
 import twitter4j.TwitterObjectFactory;
@@ -41,7 +42,7 @@ public class TwitterStreamerImpl extends TwitterStreamer<Long, String>
{
     public TwitterStreamerImpl(OAuthSettings oAuthSettings) {
         super(oAuthSettings);
 
-        setTransformer(new TweetTransformerImpl());
+        setSingleTupleExtractor(new TwitterStreamSingleTupleExtractorImpl());
     }
 
     /**
@@ -52,20 +53,20 @@ public class TwitterStreamerImpl extends TwitterStreamer<Long, String>
{
     }
 
     /** {@inheritDoc} */
-    @Override protected Client buildClient(BlockingQueue<String> tweetQueue, HttpHosts
hosts, StreamingEndpoint endpoint) {
+    @Override protected Client buildClient(BlockingQueue<String> tweetQueue, HttpHosts
hosts,
+        StreamingEndpoint endpoint) {
         return super.buildClient(tweetQueue, this.hosts, endpoint);
     }
 
     /**
-     * Long, String Tweet Transformer
+     * Long, String Tweet Single Tuple Extractor.
      */
-    private class TweetTransformerImpl implements TweetTransformer<Long, String> {
-        /** {@inheritDoc} */
-        @Override public Map<Long, String> apply(String tweet) {
+    class TwitterStreamSingleTupleExtractorImpl implements StreamSingleTupleExtractor<String,
Long, String> {
+        @Override public Map.Entry<Long, String> extract(String tweet) {
             try {
                 Status status = TwitterObjectFactory.createStatus(tweet);
 
-                return Collections.singletonMap(status.getId(), status.getText());
+                return new IgniteBiTuple<>(status.getId(), status.getText());
             }
             catch (TwitterException e) {
                 U.error(log, e);
@@ -74,4 +75,5 @@ public class TwitterStreamerImpl extends TwitterStreamer<Long, String>
{
             }
         }
     }
+
 }


Mime
View raw message