Return-Path: X-Original-To: apmail-streams-dev-archive@minotaur.apache.org Delivered-To: apmail-streams-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 53EA0118E0 for ; Tue, 6 May 2014 07:04:19 +0000 (UTC) Received: (qmail 91776 invoked by uid 500); 6 May 2014 07:04:18 -0000 Delivered-To: apmail-streams-dev-archive@streams.apache.org Received: (qmail 91741 invoked by uid 500); 6 May 2014 07:04:17 -0000 Mailing-List: contact dev-help@streams.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@streams.incubator.apache.org Delivered-To: mailing list dev@streams.incubator.apache.org Delivered-To: moderator for dev@streams.incubator.apache.org Received: (qmail 58727 invoked by uid 99); 5 May 2014 22:31:52 -0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org From: mfranklin To: dev@streams.incubator.apache.org Reply-To: dev@streams.incubator.apache.org References: In-Reply-To: Subject: [GitHub] incubator-streams pull request: Twitter Modificaitons Content-Type: text/plain Message-Id: <20140505223131.29B4A9202B5@tyr.zones.apache.org> Date: Mon, 5 May 2014 22:31:31 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Github user mfranklin commented on a diff in the pull request: https://github.com/apache/incubator-streams/pull/8#discussion_r12303430 --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java --- @@ -0,0 +1,286 @@ +package org.apache.streams.twitter.provider; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.typesafe.config.Config; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.twitter.TwitterStreamConfiguration; +import org.apache.streams.twitter.TwitterUserInformationConfiguration; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; +import twitter4j.*; +import twitter4j.conf.ConfigurationBuilder; +import twitter4j.json.DataObjectFactory; + +import java.io.Serializable; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.*; + +public class TwitterUserInformationProvider implements StreamsProvider, Serializable +{ + + public static final String STREAMS_ID = "TwitterUserInformationProvider"; + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProvider.class); + + + private TwitterUserInformationConfiguration twitterUserInformationConfiguration; + + private Class klass; + protected volatile Queue providerQueue = new LinkedBlockingQueue(); + + public TwitterUserInformationConfiguration getConfig() { return twitterUserInformationConfiguration; } + + public void setConfig(TwitterUserInformationConfiguration config) { this.twitterUserInformationConfiguration = config; } + + protected Iterator idsBatches; + protected Iterator screenNameBatches; + + protected ListeningExecutorService executor; + + protected DateTime start; + protected DateTime end; + + private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { + return new ThreadPoolExecutor(nThreads, nThreads, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + } + + public TwitterUserInformationProvider() { + Config config = StreamsConfigurator.config.getConfig("twitter"); + this.twitterUserInformationConfiguration = TwitterStreamConfigurator.detectTwitterUserInformationConfiguration(config); + + } + + public TwitterUserInformationProvider(TwitterUserInformationConfiguration config) { + this.twitterUserInformationConfiguration = config; + } + + public TwitterUserInformationProvider(Class klass) { + Config config = StreamsConfigurator.config.getConfig("twitter"); + this.twitterUserInformationConfiguration = TwitterStreamConfigurator.detectTwitterUserInformationConfiguration(config); + this.klass = klass; + } + + public TwitterUserInformationProvider(TwitterUserInformationConfiguration config, Class klass) { + this.twitterUserInformationConfiguration = config; + this.klass = klass; + } + + public Queue getProviderQueue() { + return this.providerQueue; + } + + @Override + public void startStream() { + // no op + } + + + private void loadBatch(Long[] ids) { + Twitter client = getTwitterClient(); + int keepTrying = 0; + + // keep trying to load, give it 5 attempts. + //while (keepTrying < 10) + while (keepTrying < 1) + { + try + { + long[] toQuery = new long[ids.length]; + for(int i = 0; i < ids.length; i++) + toQuery[i] = ids[i]; + + for (User tStat : client.lookupUsers(toQuery)) { + String json = DataObjectFactory.getRawJSON(tStat); + providerQueue.offer(new StreamsDatum(json)); + } + keepTrying = 10; + } + catch(TwitterException twitterException) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); + } + catch(Exception e) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, e); + } + } + } + + private void loadBatch(String[] ids) { + Twitter client = getTwitterClient(); + int keepTrying = 0; + + // keep trying to load, give it 5 attempts. + //while (keepTrying < 10) + while (keepTrying < 1) + { + try + { + for (User tStat : client.lookupUsers(ids)) { + String json = DataObjectFactory.getRawJSON(tStat); + providerQueue.offer(new StreamsDatum(json)); + } + keepTrying = 10; + } + catch(TwitterException twitterException) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); + } + catch(Exception e) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, e); + } + } + } + + public StreamsResultSet readCurrent() { + + Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext()); + + LOGGER.info("readCurrent"); + + while(idsBatches.hasNext()) --- End diff -- Since readCurrent is a batch mode processor, would you not want to return the values for just one batch per call to readCurrent? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---