Return-Path: X-Original-To: apmail-streams-dev-archive@minotaur.apache.org Delivered-To: apmail-streams-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C650E11F8F for ; Tue, 6 May 2014 00:03:22 +0000 (UTC) Received: (qmail 85497 invoked by uid 500); 6 May 2014 00:03:21 -0000 Delivered-To: apmail-streams-dev-archive@streams.apache.org Received: (qmail 85438 invoked by uid 500); 6 May 2014 00:03:21 -0000 Mailing-List: contact dev-help@streams.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@streams.incubator.apache.org Delivered-To: mailing list dev@streams.incubator.apache.org Received: (qmail 85430 invoked by uid 99); 6 May 2014 00:03:21 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 May 2014 00:03:21 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 06 May 2014 00:03:20 +0000 Received: (qmail 85348 invoked by uid 99); 6 May 2014 00:02:56 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 May 2014 00:02:56 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id F2BD79204AB; Tue, 6 May 2014 00:02:55 +0000 (UTC) From: smashew To: dev@streams.incubator.apache.org Reply-To: dev@streams.incubator.apache.org References: In-Reply-To: Subject: [GitHub] incubator-streams pull request: Twitter Modificaitons Content-Type: text/plain Message-Id: <20140506000255.F2BD79204AB@tyr.zones.apache.org> Date: Tue, 6 May 2014 00:02:55 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Github user smashew commented on a diff in the pull request: https://github.com/apache/incubator-streams/pull/8#discussion_r12306365 --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java --- @@ -0,0 +1,286 @@ +package org.apache.streams.twitter.provider; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.typesafe.config.Config; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.twitter.TwitterStreamConfiguration; +import org.apache.streams.twitter.TwitterUserInformationConfiguration; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; +import twitter4j.*; +import twitter4j.conf.ConfigurationBuilder; +import twitter4j.json.DataObjectFactory; + +import java.io.Serializable; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.*; + +public class TwitterUserInformationProvider implements StreamsProvider, Serializable +{ + + public static final String STREAMS_ID = "TwitterUserInformationProvider"; + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProvider.class); + + + private TwitterUserInformationConfiguration twitterUserInformationConfiguration; + + private Class klass; + protected volatile Queue providerQueue = new LinkedBlockingQueue(); + + public TwitterUserInformationConfiguration getConfig() { return twitterUserInformationConfiguration; } + + public void setConfig(TwitterUserInformationConfiguration config) { this.twitterUserInformationConfiguration = config; } + + protected Iterator idsBatches; + protected Iterator screenNameBatches; + + protected ListeningExecutorService executor; + + protected DateTime start; + protected DateTime end; + + private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { + return new ThreadPoolExecutor(nThreads, nThreads, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + } + + public TwitterUserInformationProvider() { + Config config = StreamsConfigurator.config.getConfig("twitter"); + this.twitterUserInformationConfiguration = TwitterStreamConfigurator.detectTwitterUserInformationConfiguration(config); + + } + + public TwitterUserInformationProvider(TwitterUserInformationConfiguration config) { + this.twitterUserInformationConfiguration = config; + } + + public TwitterUserInformationProvider(Class klass) { + Config config = StreamsConfigurator.config.getConfig("twitter"); + this.twitterUserInformationConfiguration = TwitterStreamConfigurator.detectTwitterUserInformationConfiguration(config); + this.klass = klass; + } + + public TwitterUserInformationProvider(TwitterUserInformationConfiguration config, Class klass) { + this.twitterUserInformationConfiguration = config; + this.klass = klass; + } + + public Queue getProviderQueue() { + return this.providerQueue; + } + + @Override + public void startStream() { + // no op + } + + + private void loadBatch(Long[] ids) { + Twitter client = getTwitterClient(); + int keepTrying = 0; + + // keep trying to load, give it 5 attempts. + //while (keepTrying < 10) + while (keepTrying < 1) + { + try + { + long[] toQuery = new long[ids.length]; + for(int i = 0; i < ids.length; i++) + toQuery[i] = ids[i]; + + for (User tStat : client.lookupUsers(toQuery)) { + String json = DataObjectFactory.getRawJSON(tStat); + providerQueue.offer(new StreamsDatum(json)); --- End diff -- As a side note, the synchronized block on that class may be sub-optimal for efficiency as it should be synchronized on the queue, rather than being synchronized on a static reference to the utility class (which doesn't make much sense. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---