streams-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbnks <...@git.apache.org>
Subject [GitHub] incubator-streams pull request: Streams 143
Date Wed, 13 Aug 2014 20:36:08 GMT
Github user rbnks commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/66#discussion_r16203670
  
    --- Diff: streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
---
    @@ -41,125 +46,126 @@ Licensed to the Apache Software Foundation (ASF) under one
         protected static final int MAX_ATTEMPTS = 5;
         protected static final int SLEEP_SECS = 5; //5 seconds
     
    -    protected Queue dataQueue; //exposed for testing
    -    private InstagramUserInformationConfiguration config;
    -    private Instagram instagramClient;
    +    protected Queue<MediaFeedData> dataQueue; //exposed for testing
    +    private InstagramConfiguration config;
         private AtomicBoolean isCompleted;
    +    private SimpleTokenManager<InstagramOauthToken> tokenManger;
    +    private int consecutiveErrorCount;
    +    private BackOffStrategy backOffStrategy;
     
     
    -    public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramUserInformationConfiguration
config) {
    +    public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramConfiguration
config) {
             this.dataQueue = queue;
             this.config = config;
    -        this.instagramClient = new Instagram(this.config.getClientId());
             this.isCompleted = new AtomicBoolean(false);
    +        this.tokenManger = new BasicTokenManger<InstagramOauthToken>();
    +        for (String clientId : this.config.getClientIds()) {
    +            this.tokenManger.addTokenToPool(new InstagramOauthToken(clientId));
    +        }
    +        this.consecutiveErrorCount = 0;
    +        this.backOffStrategy = new ExponentialBackOffStrategy(2);
         }
     
    -    /**
    -     * Set instagram client
    -     * @param instagramClient
    -     */
    -    protected void setInstagramClient(Instagram instagramClient) {
    -        this.instagramClient = instagramClient;
    +
    +    @VisibleForTesting
    +    protected Instagram getNextInstagramClient() {
    +        return new Instagram(this.tokenManger.getNextAvailableToken().getClientId());
         }
     
    -    /**
    -     * Gets the user ids from the {@link org.apache.streams.instagram.InstagramUserInformationConfiguration}
and
    -     * converts them to {@link java.lang.Long}
    -     * @return
    -     */
    -    protected Set<Long> getUserIds() {
    -        Set<Long> userIds = Sets.newHashSet();
    -        for(String id : config.getUserIds()) {
    -            try {
    -                userIds.add(Long.parseLong(id));
    -            } catch (NumberFormatException nfe) {
    -                LOGGER.error("Failed to parse user id, {}, to a long : {}", id, nfe.getMessage());
    +    private void queueData(MediaFeed userFeed, String userId) {
    +        if (userFeed == null) {
    +            LOGGER.error("User id, {}, returned a NULL media feed from instagram.", userId);
    +        } else {
    +            for (MediaFeedData data : userFeed.getData()) {
    +                this.dataQueue.offer(data);
    --- End diff --
    
    Previously I had a created a util to handle poll and offer failures due to a thread being
locked out of the queue for concurrent utils.  but i have changed to use the component utils
now that the locks have been removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message