Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E7BE517EF5 for ; Thu, 5 Nov 2015 12:59:16 +0000 (UTC) Received: (qmail 70270 invoked by uid 500); 5 Nov 2015 12:59:16 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 70236 invoked by uid 500); 5 Nov 2015 12:59:16 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 70227 invoked by uid 99); 5 Nov 2015 12:59:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Nov 2015 12:59:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AB222E3913; Thu, 5 Nov 2015 12:59:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: Ignite-530 Pull request fixes. Fixes according to https://cwiki.apache.org/confluence/display/IGNITE/Streamers+Implementation+Guidelines. Date: Thu, 5 Nov 2015 12:59:16 +0000 (UTC) Repository: ignite Updated Branches: refs/heads/ignite-530-pull-fixes 8f178a05b -> a0e707334 Ignite-530 Pull request fixes. Fixes according to https://cwiki.apache.org/confluence/display/IGNITE/Streamers+Implementation+Guidelines. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a0e70733 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a0e70733 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a0e70733 Branch: refs/heads/ignite-530-pull-fixes Commit: a0e70733434dead84d13e20bc4eebdf2ce696cae Parents: 8f178a0 Author: Anton Vinogradov Authored: Thu Nov 5 15:59:03 2015 +0300 Committer: Anton Vinogradov Committed: Thu Nov 5 15:59:03 2015 +0300 ---------------------------------------------------------------------- .../ignite/stream/twitter/TweetTransformer.java | 37 ------------------ .../ignite/stream/twitter/TwitterStreamer.java | 40 +++++--------------- .../twitter/IgniteTwitterStreamerTest.java | 7 +++- .../stream/twitter/TwitterStreamerImpl.java | 18 +++++---- 4 files changed, 25 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a0e70733/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TweetTransformer.java ---------------------------------------------------------------------- diff --git a/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TweetTransformer.java b/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TweetTransformer.java deleted file mode 100644 index a2f4c02..0000000 --- a/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TweetTransformer.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.stream.twitter; - -import java.util.Map; - -/** - * Implement this interface to transform from a Tweet JSON String to a set of cache entries in the form of a {@link - * Map}. - * - * @param The type of the cache key. - * @param The type of the cache value. - */ -public interface TweetTransformer { - /** - * Transformation function. - * - * @param tweet The message (Tweet JSON String) received from the Twitter Streaming API. - * @return Set of cache entries to add to the cache. It could be empty or null if the message should be skipped. - */ - Map apply(String tweet); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/a0e70733/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TwitterStreamer.java ---------------------------------------------------------------------- diff --git a/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TwitterStreamer.java b/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TwitterStreamer.java index 781cfca..5b53db0 100644 --- a/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TwitterStreamer.java +++ b/modules/twitter/src/main/java/org/apache/ignite/stream/twitter/TwitterStreamer.java @@ -54,9 +54,6 @@ import org.apache.ignite.stream.StreamAdapter; * This streamer uses https://dev.twitter.com/streaming API and supports Public API, User Streams, * Site Streams and Firehose. *

- * You can also provide a {@link TweetTransformer} to convert the incoming message into cache entries to override - * default transformer. - *

* This Streamer features: *

    *
  • Supports OAuth1 authentication scheme. @@ -68,17 +65,9 @@ public abstract class TwitterStreamer extends StreamAdapter /** Logger. */ protected IgniteLogger log; - /** - * Threads count used to transform tweets. - */ + /** Threads count used to transform tweets. */ private int threadsCount = 1; - /** - * The message transformer that converts an incoming Tweet into cache entries. If not provided default transformer - * will be used. - */ - private TweetTransformer transformer; - /** Twitter Streaming API params. See https://dev.twitter.com/streaming/overview/request-parameters */ private Map apiParams; @@ -137,21 +126,18 @@ public abstract class TwitterStreamer extends StreamAdapter @Override public Boolean call() { - while (!client.isDone() && running.get() == 1) { + while (true) { try { String tweet = tweetQueue.take(); - Map value = transformer.apply(tweet); - - if (value != null) - getStreamer().addData(value); + addMessage(tweet); } catch (InterruptedException e) { log.error("Tweets transformation was interrupted", e); + + return true; } } - - return true; } }; @@ -177,10 +163,11 @@ public abstract class TwitterStreamer extends StreamAdapter * Validates config at start. */ protected void validateConfig() { - A.notNull(getStreamer(), "streamer"); - A.notNull(getIgnite(), "ignite"); + A.notNull(getStreamer(), "Streamer"); + A.notNull(getIgnite(), "Ignite"); A.notNull(endpointUrl, "Twitter Streaming API endpoint"); - A.notNull(transformer, "Transformer"); + + A.ensure(getSingleTupleExtractor() != null || getMultipleTupleExtractor() != null, "Twitter extractor"); String followParam = apiParams.get(SITE_USER_ID_KEY); @@ -271,15 +258,6 @@ public abstract class TwitterStreamer extends StreamAdapter } /** - * Sets Transformer. - * - * @param transformer Transformer. - */ - public void setTransformer(TweetTransformer transformer) { - this.transformer = transformer; - } - - /** * Sets API Params. * * @param apiParams API Params. http://git-wip-us.apache.org/repos/asf/ignite/blob/a0e70733/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/IgniteTwitterStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/IgniteTwitterStreamerTest.java b/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/IgniteTwitterStreamerTest.java index f341e22..33633a4 100644 --- a/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/IgniteTwitterStreamerTest.java +++ b/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/IgniteTwitterStreamerTest.java @@ -70,6 +70,11 @@ public class IgniteTwitterStreamerTest extends GridCommonAbstractTest { public final WireMockServer mockServer = new WireMockServer(); //Starts server on 8080 port. /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 10_000; + } + + /** {@inheritDoc} */ @Override public void beforeTest() throws Exception { grid().getOrCreateCache(defaultCacheConfiguration()); @@ -130,7 +135,7 @@ public class IgniteTwitterStreamerTest extends GridCommonAbstractTest { CountDownLatch latch = listener.getLatch(); //Enough tweets was handled in 10 seconds. - assertTrue(latch.await(10, TimeUnit.SECONDS)); + latch.await(); unsubscribeToPutEvents(listener); http://git-wip-us.apache.org/repos/asf/ignite/blob/a0e70733/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/TwitterStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/TwitterStreamerImpl.java b/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/TwitterStreamerImpl.java index 0535d06..8ec325c 100644 --- a/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/TwitterStreamerImpl.java +++ b/modules/twitter/src/test/java/org/apache/ignite/stream/twitter/TwitterStreamerImpl.java @@ -20,10 +20,11 @@ package org.apache.ignite.stream.twitter; import com.twitter.hbc.core.Client; import com.twitter.hbc.core.HttpHosts; import com.twitter.hbc.core.endpoint.StreamingEndpoint; -import java.util.Collections; import java.util.Map; import java.util.concurrent.BlockingQueue; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.stream.StreamSingleTupleExtractor; import twitter4j.Status; import twitter4j.TwitterException; import twitter4j.TwitterObjectFactory; @@ -41,7 +42,7 @@ public class TwitterStreamerImpl extends TwitterStreamer { public TwitterStreamerImpl(OAuthSettings oAuthSettings) { super(oAuthSettings); - setTransformer(new TweetTransformerImpl()); + setSingleTupleExtractor(new TwitterStreamSingleTupleExtractorImpl()); } /** @@ -52,20 +53,20 @@ public class TwitterStreamerImpl extends TwitterStreamer { } /** {@inheritDoc} */ - @Override protected Client buildClient(BlockingQueue tweetQueue, HttpHosts hosts, StreamingEndpoint endpoint) { + @Override protected Client buildClient(BlockingQueue tweetQueue, HttpHosts hosts, + StreamingEndpoint endpoint) { return super.buildClient(tweetQueue, this.hosts, endpoint); } /** - * Long, String Tweet Transformer + * Long, String Tweet Single Tuple Extractor. */ - private class TweetTransformerImpl implements TweetTransformer { - /** {@inheritDoc} */ - @Override public Map apply(String tweet) { + class TwitterStreamSingleTupleExtractorImpl implements StreamSingleTupleExtractor { + @Override public Map.Entry extract(String tweet) { try { Status status = TwitterObjectFactory.createStatus(tweet); - return Collections.singletonMap(status.getId(), status.getText()); + return new IgniteBiTuple<>(status.getId(), status.getText()); } catch (TwitterException e) { U.error(log, e); @@ -74,4 +75,5 @@ public class TwitterStreamerImpl extends TwitterStreamer { } } } + }