streams-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mfranklin <...@git.apache.org>
Subject [GitHub] incubator-streams pull request: Streams 143
Date Tue, 12 Aug 2014 23:28:30 GMT
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/66#discussion_r16149886
  
    --- Diff: streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
---
    @@ -41,125 +46,126 @@ Licensed to the Apache Software Foundation (ASF) under one
         protected static final int MAX_ATTEMPTS = 5;
         protected static final int SLEEP_SECS = 5; //5 seconds
     
    -    protected Queue dataQueue; //exposed for testing
    -    private InstagramUserInformationConfiguration config;
    -    private Instagram instagramClient;
    +    protected Queue<MediaFeedData> dataQueue; //exposed for testing
    +    private InstagramConfiguration config;
         private AtomicBoolean isCompleted;
    +    private SimpleTokenManager<InstagramOauthToken> tokenManger;
    +    private int consecutiveErrorCount;
    +    private BackOffStrategy backOffStrategy;
     
     
    -    public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramUserInformationConfiguration
config) {
    +    public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramConfiguration
config) {
             this.dataQueue = queue;
             this.config = config;
    -        this.instagramClient = new Instagram(this.config.getClientId());
             this.isCompleted = new AtomicBoolean(false);
    +        this.tokenManger = new BasicTokenManger<InstagramOauthToken>();
    +        for (String clientId : this.config.getClientIds()) {
    +            this.tokenManger.addTokenToPool(new InstagramOauthToken(clientId));
    +        }
    +        this.consecutiveErrorCount = 0;
    +        this.backOffStrategy = new ExponentialBackOffStrategy(2);
         }
     
    -    /**
    -     * Set instagram client
    -     * @param instagramClient
    -     */
    -    protected void setInstagramClient(Instagram instagramClient) {
    -        this.instagramClient = instagramClient;
    +
    +    @VisibleForTesting
    +    protected Instagram getNextInstagramClient() {
    +        return new Instagram(this.tokenManger.getNextAvailableToken().getClientId());
         }
     
    -    /**
    -     * Gets the user ids from the {@link org.apache.streams.instagram.InstagramUserInformationConfiguration}
and
    -     * converts them to {@link java.lang.Long}
    -     * @return
    -     */
    -    protected Set<Long> getUserIds() {
    -        Set<Long> userIds = Sets.newHashSet();
    -        for(String id : config.getUserIds()) {
    -            try {
    -                userIds.add(Long.parseLong(id));
    -            } catch (NumberFormatException nfe) {
    -                LOGGER.error("Failed to parse user id, {}, to a long : {}", id, nfe.getMessage());
    +    private void queueData(MediaFeed userFeed, String userId) {
    +        if (userFeed == null) {
    +            LOGGER.error("User id, {}, returned a NULL media feed from instagram.", userId);
    +        } else {
    +            for (MediaFeedData data : userFeed.getData()) {
    +                this.dataQueue.offer(data);
                 }
             }
    -        return userIds;
         }
     
         /**
    -     * Determins the course of action to take when Instagram returns an exception to
a request.  If it is a rate limit
    -     * exception, it implements an exponentional back off strategy.  If it is anyother
exception, it is logged and
    -     * rethrown.
    -     * @param instaExec exception to handle
    -     * @param attempt number of attempts that have occured to pull this users information
    -     * @throws InstagramException
    +     * @return true when the collector has queued all of the available media feed data
for the provided users.
          */
    -    protected void handleInstagramException(InstagramException instaExec, int attempt)
throws InstagramException {
    -        LOGGER.debug("RemainingApiLimitStatus: {}", instaExec.getRemainingLimitStatus());
    -        if(instaExec.getRemainingLimitStatus() == 0) { //rate limit exception
    -            long sleepTime = Math.round(Math.pow(SLEEP_SECS, attempt)) * 1000;
    -            try {
    -                LOGGER.debug("Encountered rate limit exception, sleeping for {} ms",
sleepTime);
    -                Thread.sleep(sleepTime);
    -            } catch (InterruptedException ie) {
    -                Thread.currentThread().interrupt();
    +    public boolean isCompleted() {
    +        return this.isCompleted.get();
    +    }
    +
    +    @Override
    +    public void run() {
    +        try {
    +            for (UserId user : this.config.getUsersInfo().getUserIds()) {
    +                collectMediaFeed(user);
                 }
    -        } else {
    -            LOGGER.error("Instagram returned an excetpion to the user media request :
{}", instaExec.getMessage());
    -            throw instaExec;
    +        } catch (Exception e) {
    +            LOGGER.error("Shutting down InstagramCollector. Exception occured: {}", e.getMessage());
             }
    +        this.isCompleted.set(true);
         }
     
         /**
    -     * Gets the MediaFeedData for this particular user and adds it to the share queued.
    -     * @param userId
    +     * Pull Recement Media for a user and queues the resulting data. Will try a single
call 5 times before failing and
    +     * moving on to the next call or returning.
    +     * @param user
    +     * @throws Exception
          */
    -    private void getUserMedia(Long userId) {
    -        MediaFeed feed = null;
    -        int attempts = 0;
    -        int count = 0;
    +    @VisibleForTesting
    +    protected void collectMediaFeed(UserId user) throws Exception {
    +        Pagination pagination = null;
             do {
    -            ++attempts;
    -            try {
    -                feed = this.instagramClient.getRecentMediaFeed(userId);
    -                queueData(feed, userId);
    -                count += feed.getData().size();
    -                while(feed != null && feed.getPagination() != null &&
feed.getPagination().hasNextPage()) {
    -                    feed = this.instagramClient.getRecentMediaNextPage(feed.getPagination());
    -                    queueData(feed, userId);
    -                    count += feed.getData().size();
    -                }
    -            } catch (InstagramException ie) {
    +            int attempts = 0;
    +            boolean succesfullDataPull = false;
    +            while (!succesfullDataPull && attempts < MAX_ATTEMPTS) {
    +                ++attempts;
    +                MediaFeed feed = null;
                     try {
    -                    handleInstagramException(ie, attempts);
    -                } catch (InstagramException ie2) { //not a rate limit exception, ignore
user
    -                    attempts = MAX_ATTEMPTS;
    -                }
    -            }
    -        } while(feed == null && attempts < MAX_ATTEMPTS);
    -        LOGGER.debug("For user, {}, received {} MediaFeedData", userId, count);
    -    }
    -
    -    private void queueData(MediaFeed userFeed, Long userId) {
    -        if(userFeed == null) {
    -            LOGGER.error("User id, {}, returned a NULL media feed from instagram.", userId);
    -        } else {
    -            for(MediaFeedData data : userFeed.getData()) {
    -                synchronized (this.dataQueue) { //unnecessary
    -                    while(!this.dataQueue.offer(data)) {
    -                        Thread.yield();
    +                    if (pagination == null) {
    +                        feed = getNextInstagramClient().getRecentMediaFeed(Long.valueOf(user.getUserId()),
    +                                0,
    +                                null,
    +                                null,
    +                                user.getBeforeDate() == null ? null : user.getBeforeDate().toDate(),
    +                                user.getAfterDate() == null ? null : user.getAfterDate().toDate());
    +                    } else {
    +                        feed = getNextInstagramClient().getRecentMediaNextPage(pagination);
    +                    }
    +                } catch (Exception e) {
    +                    handleException(e);
    +                    if(e instanceof InstagramBadRequestException) {
    --- End diff --
    
    You are checking for InstagramBadRequestException here and in handleException


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message