streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mfrank...@apache.org
Subject [3/8] git commit: STREAMS-143 | Refactored InstagramProvider to accept date ranges for user updates
Date Mon, 18 Aug 2014 16:10:21 GMT
STREAMS-143 | Refactored InstagramProvider to accept date ranges for user updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9f296be7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9f296be7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9f296be7

Branch: refs/heads/master
Commit: 9f296be7a9fe5c4f0a6449e824cdadc8ad982add
Parents: ff5f821
Author: Ryan Ebanks <rebanks@Informations-MacBook-Pro-2.local>
Authored: Tue Aug 12 15:46:59 2014 -0500
Committer: Ryan Ebanks <rebanks@Informations-MacBook-Pro-2.local>
Committed: Tue Aug 12 15:46:59 2014 -0500

----------------------------------------------------------------------
 .../instagram/provider/InstagramOauthToken.java |  28 +++
 .../provider/InstagramRecentMediaCollector.java | 198 ++++++++++---------
 .../provider/InstagramRecentMediaProvider.java  |  35 +++-
 .../com/instagram/InstagramConfiguration.json   |  18 +-
 .../InstagramRecentMediaCollectorTest.java      | 138 ++++++-------
 .../util/ConcurentYieldTillSuccessQueue.java    |  27 +++
 6 files changed, 269 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9f296be7/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
new file mode 100644
index 0000000..9773f92
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
@@ -0,0 +1,28 @@
+package org.apache.streams.instagram.provider;
+
+import org.apache.streams.util.oauth.tokens.OauthToken;
+
+/**
+ *
+ */
+public class InstagramOauthToken extends OauthToken{
+
+    private String clientId;
+
+    public InstagramOauthToken(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    @Override
+    protected boolean internalEquals(Object o) {
+        if(!(o instanceof InstagramOauthToken)) {
+            return false;
+        }
+        InstagramOauthToken that = (InstagramOauthToken) o;
+        return this.clientId.equals(that.clientId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9f296be7/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
index 4f27e49..932828d 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
@@ -14,26 +14,31 @@ specific language governing permissions and limitations
 under the License. */
 package org.apache.streams.instagram.provider;
 
-import com.google.common.collect.Sets;
-import org.apache.streams.instagram.InstagramUserInformationConfiguration;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.UserId;
+import org.apache.streams.util.api.requests.backoff.BackOffException;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
+import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
+import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger;
 import org.jinstagram.Instagram;
+import org.jinstagram.entity.common.Pagination;
 import org.jinstagram.entity.users.feed.MediaFeed;
 import org.jinstagram.entity.users.feed.MediaFeedData;
-import org.jinstagram.exceptions.InstagramException;
+import org.jinstagram.exceptions.InstagramBadRequestException;
+import org.jinstagram.exceptions.InstagramRateLimitException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Queue;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Executes on all of the Instagram requests to collect the media feed data.
- *
+ * <p/>
  * If errors/exceptions occur when trying to gather data for a particular user, that user
is skipped and the collector
- * move on to the next user.  If a rate limit exception occurs it employs an exponential
back off strategy for up to
- * 5 attempts.
- *
+ * move on to the next user.  If a rate limit exception occurs it employs an exponential
back off strategy.
  */
 public class InstagramRecentMediaCollector implements Runnable {
 
@@ -41,125 +46,126 @@ public class InstagramRecentMediaCollector implements Runnable {
     protected static final int MAX_ATTEMPTS = 5;
     protected static final int SLEEP_SECS = 5; //5 seconds
 
-    protected Queue dataQueue; //exposed for testing
-    private InstagramUserInformationConfiguration config;
-    private Instagram instagramClient;
+    protected Queue<MediaFeedData> dataQueue; //exposed for testing
+    private InstagramConfiguration config;
     private AtomicBoolean isCompleted;
+    private SimpleTokenManager<InstagramOauthToken> tokenManger;
+    private int consecutiveErrorCount;
+    private BackOffStrategy backOffStrategy;
 
 
-    public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramUserInformationConfiguration
config) {
+    public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramConfiguration
config) {
         this.dataQueue = queue;
         this.config = config;
-        this.instagramClient = new Instagram(this.config.getClientId());
         this.isCompleted = new AtomicBoolean(false);
+        this.tokenManger = new BasicTokenManger<InstagramOauthToken>();
+        for (String clientId : this.config.getClientIds()) {
+            this.tokenManger.addTokenToPool(new InstagramOauthToken(clientId));
+        }
+        this.consecutiveErrorCount = 0;
+        this.backOffStrategy = new ExponentialBackOffStrategy(2);
     }
 
-    /**
-     * Set instagram client
-     * @param instagramClient
-     */
-    protected void setInstagramClient(Instagram instagramClient) {
-        this.instagramClient = instagramClient;
+
+    @VisibleForTesting
+    protected Instagram getNextInstagramClient() {
+        return new Instagram(this.tokenManger.getNextAvailableToken().getClientId());
     }
 
-    /**
-     * Gets the user ids from the {@link org.apache.streams.instagram.InstagramUserInformationConfiguration}
and
-     * converts them to {@link java.lang.Long}
-     * @return
-     */
-    protected Set<Long> getUserIds() {
-        Set<Long> userIds = Sets.newHashSet();
-        for(String id : config.getUserIds()) {
-            try {
-                userIds.add(Long.parseLong(id));
-            } catch (NumberFormatException nfe) {
-                LOGGER.error("Failed to parse user id, {}, to a long : {}", id, nfe.getMessage());
+    private void queueData(MediaFeed userFeed, String userId) {
+        if (userFeed == null) {
+            LOGGER.error("User id, {}, returned a NULL media feed from instagram.", userId);
+        } else {
+            for (MediaFeedData data : userFeed.getData()) {
+                this.dataQueue.offer(data);
             }
         }
-        return userIds;
     }
 
     /**
-     * Determins the course of action to take when Instagram returns an exception to a request.
 If it is a rate limit
-     * exception, it implements an exponentional back off strategy.  If it is anyother exception,
it is logged and
-     * rethrown.
-     * @param instaExec exception to handle
-     * @param attempt number of attempts that have occured to pull this users information
-     * @throws InstagramException
+     * @return true when the collector has queued all of the available media feed data for
the provided users.
      */
-    protected void handleInstagramException(InstagramException instaExec, int attempt) throws
InstagramException {
-        LOGGER.debug("RemainingApiLimitStatus: {}", instaExec.getRemainingLimitStatus());
-        if(instaExec.getRemainingLimitStatus() == 0) { //rate limit exception
-            long sleepTime = Math.round(Math.pow(SLEEP_SECS, attempt)) * 1000;
-            try {
-                LOGGER.debug("Encountered rate limit exception, sleeping for {} ms", sleepTime);
-                Thread.sleep(sleepTime);
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
+    public boolean isCompleted() {
+        return this.isCompleted.get();
+    }
+
+    @Override
+    public void run() {
+        try {
+            for (UserId user : this.config.getUsersInfo().getUserIds()) {
+                collectMediaFeed(user);
             }
-        } else {
-            LOGGER.error("Instagram returned an excetpion to the user media request : {}",
instaExec.getMessage());
-            throw instaExec;
+        } catch (Exception e) {
+            LOGGER.error("Shutting down InstagramCollector. Exception occured: {}", e.getMessage());
         }
+        this.isCompleted.set(true);
     }
 
     /**
-     * Gets the MediaFeedData for this particular user and adds it to the share queued.
-     * @param userId
+     * Pull Recement Media for a user and queues the resulting data. Will try a single call
5 times before failing and
+     * moving on to the next call or returning.
+     * @param user
+     * @throws Exception
      */
-    private void getUserMedia(Long userId) {
-        MediaFeed feed = null;
-        int attempts = 0;
-        int count = 0;
+    @VisibleForTesting
+    protected void collectMediaFeed(UserId user) throws Exception {
+        Pagination pagination = null;
         do {
-            ++attempts;
-            try {
-                feed = this.instagramClient.getRecentMediaFeed(userId);
-                queueData(feed, userId);
-                count += feed.getData().size();
-                while(feed != null && feed.getPagination() != null && feed.getPagination().hasNextPage())
{
-                    feed = this.instagramClient.getRecentMediaNextPage(feed.getPagination());
-                    queueData(feed, userId);
-                    count += feed.getData().size();
-                }
-            } catch (InstagramException ie) {
+            int attempts = 0;
+            boolean succesfullDataPull = false;
+            while (!succesfullDataPull && attempts < MAX_ATTEMPTS) {
+                ++attempts;
+                MediaFeed feed = null;
                 try {
-                    handleInstagramException(ie, attempts);
-                } catch (InstagramException ie2) { //not a rate limit exception, ignore user
-                    attempts = MAX_ATTEMPTS;
-                }
-            }
-        } while(feed == null && attempts < MAX_ATTEMPTS);
-        LOGGER.debug("For user, {}, received {} MediaFeedData", userId, count);
-    }
-
-    private void queueData(MediaFeed userFeed, Long userId) {
-        if(userFeed == null) {
-            LOGGER.error("User id, {}, returned a NULL media feed from instagram.", userId);
-        } else {
-            for(MediaFeedData data : userFeed.getData()) {
-                synchronized (this.dataQueue) { //unnecessary
-                    while(!this.dataQueue.offer(data)) {
-                        Thread.yield();
+                    if (pagination == null) {
+                        feed = getNextInstagramClient().getRecentMediaFeed(Long.valueOf(user.getUserId()),
+                                0,
+                                null,
+                                null,
+                                user.getBeforeDate() == null ? null : user.getBeforeDate().toDate(),
+                                user.getAfterDate() == null ? null : user.getAfterDate().toDate());
+                    } else {
+                        feed = getNextInstagramClient().getRecentMediaNextPage(pagination);
+                    }
+                } catch (Exception e) {
+                    handleException(e);
+                    if(e instanceof InstagramBadRequestException) {
+                        attempts = MAX_ATTEMPTS; //don't repeat bad requests.
                     }
+                    if(this.consecutiveErrorCount > Math.max(this.tokenManger.numAvailableTokens(),
MAX_ATTEMPTS*2)) {
+                        throw new Exception("InstagramCollector failed to successfully connect
to instagram on "+this.consecutiveErrorCount+" attempts.");
+                    }
+                }
+                if(succesfullDataPull = feed != null) {
+                    this.consecutiveErrorCount = 0;
+                    this.backOffStrategy.reset();
+                    pagination = feed.getPagination();
+                    queueData(feed, user.getUserId());
                 }
             }
-        }
+            if(!succesfullDataPull) {
+                LOGGER.error("Failed to get data from instagram for user id, {}, skipping
user.", user.getUserId());
+            }
+        } while (pagination != null && pagination.hasNextPage());
     }
 
     /**
-     *
-     * @return true when the collector has queued all of available media feed data for the
provided users.
+     * Handles different types of {@link java.lang.Exception} caught while trying to pull
Instagram data.
+     * BackOffs/Sleeps when it encounters a rate limit expection..
+     * @param e
+     * @throws BackOffException
      */
-    public boolean isCompleted() {
-        return this.isCompleted.get();
-    }
-
-    @Override
-    public void run() {
-        for(Long userId : getUserIds()) {
-            getUserMedia(userId);
+    protected void handleException(Exception e) throws BackOffException {
+        if(e instanceof InstagramRateLimitException) {
+            LOGGER.warn("Received rate limit exception from Instagram, backing off. : {}",
e);
+            this.backOffStrategy.backOff();
+        } else if(e instanceof InstagramBadRequestException) {
+            LOGGER.error("Received Bad Requests exception form Instagram: {}", e);
+            ++this.consecutiveErrorCount;
+        } else {
+            LOGGER.error("Received Expection while attempting to poll Instagram: {}", e);
+            ++this.consecutiveErrorCount;
         }
-        this.isCompleted.set(true);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9f296be7/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
index 30ddda4..e3e9399 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
@@ -14,17 +14,19 @@ specific language governing permissions and limitations
 under the License. */
 package org.apache.streams.instagram.provider;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.instagram.InstagramConfigurator;
-import org.apache.streams.instagram.InstagramUserInformationConfiguration;
+import org.apache.streams.instagram.*;
+import org.apache.streams.util.SerializationUtil;
 import org.jinstagram.entity.users.feed.MediaFeedData;
 import org.joda.time.DateTime;
 
 import java.math.BigInteger;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -36,17 +38,17 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class InstagramRecentMediaProvider implements StreamsProvider {
 
-    private InstagramUserInformationConfiguration config;
+    private InstagramConfiguration config;
     private InstagramRecentMediaCollector dataCollector;
     protected Queue<MediaFeedData> mediaFeedQueue; //exposed for testing
     private ExecutorService executorService;
     private AtomicBoolean isCompleted;
 
     public InstagramRecentMediaProvider() {
-        this(InstagramConfigurator.detectInstagramUserInformationConfiguration(StreamsConfigurator.config.getConfig("instagram")));
+        this(InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram")));
     }
 
-    public InstagramRecentMediaProvider(InstagramUserInformationConfiguration config) {
+    public InstagramRecentMediaProvider(InstagramConfiguration config) {
         this.config = config;
         this.mediaFeedQueue = Queues.newConcurrentLinkedQueue();
     }
@@ -62,6 +64,7 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
      * EXPOSED FOR TESTING
      * @return
      */
+    @VisibleForTesting
     protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
         return new InstagramRecentMediaCollector(this.mediaFeedQueue, this.config);
     }
@@ -112,4 +115,26 @@ public class InstagramRecentMediaProvider implements StreamsProvider
{
             this.executorService = null;
         }
     }
+
+    /**
+     * Add default start and stop points if necessary.
+     */
+    private void updateUserInfoList() {
+        UsersInfo usersInfo = this.config.getUsersInfo();
+        if(usersInfo.getDefaultAfterDate() == null && usersInfo.getDefaultBeforeDate()
== null) {
+            return;
+        }
+        DateTime defaultAfterDate = usersInfo.getDefaultAfterDate();
+        DateTime defaultBeforeDate = usersInfo.getDefaultBeforeDate();
+        for(UserId user : usersInfo.getUserIds()) {
+            if(defaultAfterDate != null && user.getAfterDate() == null) {
+                user.setAfterDate(defaultAfterDate);
+            }
+            if(defaultBeforeDate != null && user.getBeforeDate() == null) {
+                user.setBeforeDate(defaultBeforeDate);
+            }
+        }
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9f296be7/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
index 7b894a2..c662660 100644
--- a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
+++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
@@ -5,9 +5,13 @@
     "javaType" : "org.apache.streams.instagram.InstagramConfiguration",
     "javaInterfaces": ["java.io.Serializable"],
     "properties": {
-        "clientId": {
-            "type": "string",
-            "description": "Your Instagram Client Id"
+        "clientIds": {
+            "type": "array",
+            "uniqueItems": true,
+            "items": {
+                "type": "string"
+            },
+            "description": "Your Instagram Client Ids"
         },
         "usersInfo": {
             "type": "object",
@@ -23,12 +27,12 @@
                 },
                 "defaultAfterDate": {
                     "type": "string",
-                    "format": "datetime",
+                    "format": "date-time",
                     "description": "If the api allows to gather data by date range, this
date will be used as the start of the range for the request for all users that don't have
date ranges specified. If this is null it will pull from the earliest possible time"
                 },
                 "defaultBeforeDate": {
                     "type": "string",
-                    "format": "datetime",
+                    "format": "date-time",
                     "description": "If the api allows to gather data by date range, this
date will be used as the end of the range for the request for all users that don't have date
ranges specified. If this is null it will pull till current time."
                 }
             }
@@ -44,12 +48,12 @@
                 },
                 "afterDate": {
                     "type": "string",
-                    "format": "datetime",
+                    "format": "date-time",
                     "description": "If the api allows to gather data by date range, this
date will be used as the start of the range for the request for this user. If this is null
it will use the defaultBeforeDate."
                 },
                 "beforeDate": {
                     "type": "string",
-                    "format": "datetime",
+                    "format": "date-time",
                     "description": "If the api allows to gather data by date range, this
date will be used as the end of the range for the request for this user.. If this is null
it will use the defaultAfterDate."
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9f296be7/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
index 0225b40..74b5139 100644
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
@@ -14,16 +14,25 @@ specific language governing permissions and limitations
 under the License. */
 package org.apache.streams.instagram.provider;
 
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
+import org.apache.streams.instagram.InstagramConfiguration;
 import org.apache.streams.instagram.InstagramUserInformationConfiguration;
+import org.apache.streams.instagram.UserId;
+import org.apache.streams.instagram.UsersInfo;
+import org.apache.streams.util.ConcurentYieldTillSuccessQueue;
 import org.jinstagram.Instagram;
 import org.jinstagram.entity.common.Pagination;
 import org.jinstagram.entity.users.feed.MediaFeed;
 import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.jinstagram.exceptions.InstagramBadRequestException;
 import org.jinstagram.exceptions.InstagramException;
+import org.jinstagram.exceptions.InstagramRateLimitException;
+import org.joda.time.DateTime;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -34,91 +43,63 @@ import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.junit.Assert.*;
-import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link org.apache.streams.instagram.provider.InstagramRecentMediaCollector}
  */
-public class InstagramRecentMediaCollectorTest {
+public class InstagramRecentMediaCollectorTest extends RandomizedTest {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollectorTest.class);
 
     private int expectedDataCount = 0;
-    private long randomSeed = System.currentTimeMillis();
-    private Random rand = new Random(randomSeed);
-    private Map<Pagination, MediaFeed> pageMap = Maps.newHashMap();
 
     @Test
     public void testHandleInstagramException1() throws InstagramException {
-        InstagramException ie = mock(InstagramException.class);
+        InstagramException ie = mock(InstagramRateLimitException.class);
         when(ie.getRemainingLimitStatus()).thenReturn(1);
         final String message = "Test Message";
         when(ie.getMessage()).thenReturn(message);
-        InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(),
new InstagramUserInformationConfiguration());
+        InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurentYieldTillSuccessQueue<MediaFeedData>(),
new InstagramConfiguration());
         try {
-            collector.handleInstagramException(ie, 1);
-            fail("Expected RuntimeException to be thrown");
-        } catch (InstagramException rte) {
-//            assertTrue(rte.getMessage().contains("Mock for InstagramException"));
-            assertEquals(message, rte.getMessage());
+            long startTime = System.currentTimeMillis();
+            collector.handleException(ie);
+            long endTime = System.currentTimeMillis();
+            assertTrue(2000 <= endTime - startTime);
+            startTime = System.currentTimeMillis();
+            collector.handleException(ie);
+            endTime = System.currentTimeMillis();
+            assertTrue(4000 <= endTime - startTime);
+        } catch (Exception e) {
+            fail("Should not have thrown an exception.");
         }
     }
 
-    @Test
-    public void testHandleInstagramException2() throws InstagramException{
-        InstagramException ie = mock(InstagramException.class);
-        when(ie.getRemainingLimitStatus()).thenReturn(0);
-        InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(),
new InstagramUserInformationConfiguration());
-        long startTime = System.currentTimeMillis();
-        collector.handleInstagramException(ie, 1);
-        long endTime = System.currentTimeMillis();
-        LOGGER.debug("Slept for {} ms", startTime - endTime);
-        assertTrue(endTime - startTime >= 4000); //allow for 1 sec of error
-        startTime = System.currentTimeMillis();
-        collector.handleInstagramException(ie, 2);
-        endTime = System.currentTimeMillis();
-        LOGGER.debug("Slept for {} ms", startTime - endTime);
-        assertTrue(endTime - startTime >= 24000); //allow for 1 sec of error
-    }
-
-    @Test
-    public void testGetUserIds() {
-        InstagramUserInformationConfiguration config = new InstagramUserInformationConfiguration();
-        List<String> userIds = Lists.newLinkedList();
-        userIds.add("1");
-        userIds.add("2");
-        userIds.add("3");
-        userIds.add("4");
-        userIds.add("abcdefg");
-        config.setUserIds(userIds);
-        InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(),
config);
-
-        Set<Long> expected = Sets.newHashSet();
-        expected.add(1L);
-        expected.add(2L);
-        expected.add(3L);
-        expected.add(4L);
-
-        assertEquals(expected, collector.getUserIds());
-    }
 
     @Test
+    @Repeat(iterations = 3)
     public void testRun() {
-        Queue<MediaFeedData> data = Queues.newConcurrentLinkedQueue();
-        InstagramUserInformationConfiguration config = new InstagramUserInformationConfiguration();
-        List<String> userIds = Lists.newLinkedList();
-        userIds.add("1");
-        userIds.add("2");
-        userIds.add("3");
-        userIds.add("4");
-        config.setUserIds(userIds);
-        InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(data,
config);
-        collector.setInstagramClient(createMockInstagramClient());
+        this.expectedDataCount = 0;
+        Queue<MediaFeedData> data = new ConcurentYieldTillSuccessQueue<MediaFeedData>();
+        InstagramConfiguration config = new InstagramConfiguration();
+        UsersInfo usersInfo = new UsersInfo();
+        config.setUsersInfo(usersInfo);
+        Set<UserId> users = creatUsers(randomIntBetween(0, 100));
+        usersInfo.setUserIds(users);
+
+        final Instagram mockInstagram = createMockInstagramClient();
+        InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(data,
config) {
+            @Override
+            protected Instagram getNextInstagramClient() {
+                return mockInstagram;
+            }
+        };
+        assertFalse(collector.isCompleted());
         collector.run();
-        LOGGER.debug("Random seed == {}", randomSeed);
-        assertEquals("Random Seed == " + randomSeed, this.expectedDataCount, data.size());
+        assertTrue(collector.isCompleted());
+        assertEquals(this.expectedDataCount, data.size());
     }
 
     private Instagram createMockInstagramClient() {
@@ -127,12 +108,19 @@ public class InstagramRecentMediaCollectorTest {
             final InstagramException mockException = mock(InstagramException.class);
             when(mockException.getRemainingLimitStatus()).thenReturn(-1);
             when(mockException.getMessage()).thenReturn("MockInstagramException message");
-            when(instagramClient.getRecentMediaFeed(any(Long.class))).thenAnswer(new Answer<MediaFeed>()
{
+            when(instagramClient.getRecentMediaFeed(anyLong(), anyInt(), anyString(), anyString(),
any(Date.class), any(Date.class))).thenAnswer(new Answer<MediaFeed>() {
                 @Override
                 public MediaFeed answer(InvocationOnMock invocationOnMock) throws Throwable
{
-                    long param = (Long) invocationOnMock.getArguments()[0];
-                    if (param == 2L) {
-                        throw mockException;
+                    if (randomInt(20) == 0) { //5% throw exceptions
+                        int type = randomInt(4);
+                        if (type == 0)
+                            throw mock(InstagramRateLimitException.class);
+                        else if (type == 1)
+                            throw mock(InstagramBadRequestException.class);
+                        else if (type == 2)
+                            throw mock(InstagramException.class);
+                        else
+                            throw new Exception();
                     } else {
                         return createRandomMockMediaFeed();
                     }
@@ -150,11 +138,27 @@ public class InstagramRecentMediaCollectorTest {
         return instagramClient;
     }
 
+    private Set<UserId> creatUsers(int numUsers) {
+        Set<UserId> users = Sets.newHashSet();
+        for(int i=0; i < numUsers; ++i) {
+            UserId user = new UserId();
+            user.setUserId(Integer.toString(randomInt()));
+            if(randomInt(2) == 0) {
+                user.setAfterDate(DateTime.now().minusSeconds(randomIntBetween(0, 1000)));
+            }
+            if(randomInt(2) == 0) {
+                user.setBeforeDate(DateTime.now());
+            }
+            users.add(user);
+        }
+        return users;
+    }
+
     private MediaFeed createRandomMockMediaFeed() throws InstagramException {
         MediaFeed feed = mock(MediaFeed.class);
-        when(feed.getData()).thenReturn(createData(this.rand.nextInt(100)));
+        when(feed.getData()).thenReturn(createData(randomInt(100)));
         Pagination pagination = mock(Pagination.class);
-        if(this.rand.nextInt(2) == 0) {
+        if(randomInt(2) == 0) {
             when(pagination.hasNextPage()).thenReturn(true);
         } else {
             when(pagination.hasNextPage()).thenReturn(false);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9f296be7/streams-util/src/main/java/org/apache/streams/util/ConcurentYieldTillSuccessQueue.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/ConcurentYieldTillSuccessQueue.java
b/streams-util/src/main/java/org/apache/streams/util/ConcurentYieldTillSuccessQueue.java
new file mode 100644
index 0000000..e438f54
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/ConcurentYieldTillSuccessQueue.java
@@ -0,0 +1,27 @@
+package org.apache.streams.util;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * A {@link java.util.concurrent.ConcurrentLinkedQueue} implementation that causes thread
yields when data is not
+ * successfully offered or polled.
+ */
+public class ConcurentYieldTillSuccessQueue<T> extends ConcurrentLinkedQueue<T>
{
+
+    @Override
+    public T poll() {
+        T item = null;
+        while(!super.isEmpty() && (item = super.poll()) == null) {
+            Thread.yield();;
+        }
+        return item;
+    }
+
+    @Override
+    public boolean offer(T t) {
+        while(!super.offer(t)) {
+            Thread.yield();
+        }
+        return true;
+    }
+}


Mime
View raw message