Return-Path: X-Original-To: apmail-streams-commits-archive@minotaur.apache.org Delivered-To: apmail-streams-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E3C4811325 for ; Mon, 18 Aug 2014 16:10:52 +0000 (UTC) Received: (qmail 16571 invoked by uid 500); 18 Aug 2014 16:10:46 -0000 Delivered-To: apmail-streams-commits-archive@streams.apache.org Received: (qmail 16547 invoked by uid 500); 18 Aug 2014 16:10:46 -0000 Mailing-List: contact commits-help@streams.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@streams.incubator.apache.org Delivered-To: mailing list commits@streams.incubator.apache.org Received: (qmail 16526 invoked by uid 99); 18 Aug 2014 16:10:45 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Aug 2014 16:10:45 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 18 Aug 2014 16:10:40 +0000 Received: (qmail 10089 invoked by uid 99); 18 Aug 2014 16:10:20 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Aug 2014 16:10:20 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 05F779B5457; Mon, 18 Aug 2014 16:10:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mfranklin@apache.org To: commits@streams.incubator.apache.org Date: Mon, 18 Aug 2014 16:10:21 -0000 Message-Id: <8df5b16903474eb78e4e2b1efddd2082@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/8] git commit: STREAMS-143 | Refactored InstagramProvider to accept date ranges for user updates X-Virus-Checked: Checked by ClamAV on apache.org STREAMS-143 | Refactored InstagramProvider to accept date ranges for user updates Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9f296be7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9f296be7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9f296be7 Branch: refs/heads/master Commit: 9f296be7a9fe5c4f0a6449e824cdadc8ad982add Parents: ff5f821 Author: Ryan Ebanks Authored: Tue Aug 12 15:46:59 2014 -0500 Committer: Ryan Ebanks Committed: Tue Aug 12 15:46:59 2014 -0500 ---------------------------------------------------------------------- .../instagram/provider/InstagramOauthToken.java | 28 +++ .../provider/InstagramRecentMediaCollector.java | 198 ++++++++++--------- .../provider/InstagramRecentMediaProvider.java | 35 +++- .../com/instagram/InstagramConfiguration.json | 18 +- .../InstagramRecentMediaCollectorTest.java | 138 ++++++------- .../util/ConcurentYieldTillSuccessQueue.java | 27 +++ 6 files changed, 269 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9f296be7/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java new file mode 100644 index 0000000..9773f92 --- /dev/null +++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java @@ -0,0 +1,28 @@ +package org.apache.streams.instagram.provider; + +import org.apache.streams.util.oauth.tokens.OauthToken; + +/** + * + */ +public class InstagramOauthToken extends OauthToken{ + + private String clientId; + + public InstagramOauthToken(String clientId) { + this.clientId = clientId; + } + + public String getClientId() { + return clientId; + } + + @Override + protected boolean internalEquals(Object o) { + if(!(o instanceof InstagramOauthToken)) { + return false; + } + InstagramOauthToken that = (InstagramOauthToken) o; + return this.clientId.equals(that.clientId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9f296be7/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java index 4f27e49..932828d 100644 --- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java +++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java @@ -14,26 +14,31 @@ specific language governing permissions and limitations under the License. */ package org.apache.streams.instagram.provider; -import com.google.common.collect.Sets; -import org.apache.streams.instagram.InstagramUserInformationConfiguration; +import com.google.common.annotations.VisibleForTesting; +import org.apache.streams.instagram.InstagramConfiguration; +import org.apache.streams.instagram.UserId; +import org.apache.streams.util.api.requests.backoff.BackOffException; +import org.apache.streams.util.api.requests.backoff.BackOffStrategy; +import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy; +import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager; +import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger; import org.jinstagram.Instagram; +import org.jinstagram.entity.common.Pagination; import org.jinstagram.entity.users.feed.MediaFeed; import org.jinstagram.entity.users.feed.MediaFeedData; -import org.jinstagram.exceptions.InstagramException; +import org.jinstagram.exceptions.InstagramBadRequestException; +import org.jinstagram.exceptions.InstagramRateLimitException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Queue; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; /** * Executes on all of the Instagram requests to collect the media feed data. - * + *

* If errors/exceptions occur when trying to gather data for a particular user, that user is skipped and the collector - * move on to the next user. If a rate limit exception occurs it employs an exponential back off strategy for up to - * 5 attempts. - * + * move on to the next user. If a rate limit exception occurs it employs an exponential back off strategy. */ public class InstagramRecentMediaCollector implements Runnable { @@ -41,125 +46,126 @@ public class InstagramRecentMediaCollector implements Runnable { protected static final int MAX_ATTEMPTS = 5; protected static final int SLEEP_SECS = 5; //5 seconds - protected Queue dataQueue; //exposed for testing - private InstagramUserInformationConfiguration config; - private Instagram instagramClient; + protected Queue dataQueue; //exposed for testing + private InstagramConfiguration config; private AtomicBoolean isCompleted; + private SimpleTokenManager tokenManger; + private int consecutiveErrorCount; + private BackOffStrategy backOffStrategy; - public InstagramRecentMediaCollector(Queue queue, InstagramUserInformationConfiguration config) { + public InstagramRecentMediaCollector(Queue queue, InstagramConfiguration config) { this.dataQueue = queue; this.config = config; - this.instagramClient = new Instagram(this.config.getClientId()); this.isCompleted = new AtomicBoolean(false); + this.tokenManger = new BasicTokenManger(); + for (String clientId : this.config.getClientIds()) { + this.tokenManger.addTokenToPool(new InstagramOauthToken(clientId)); + } + this.consecutiveErrorCount = 0; + this.backOffStrategy = new ExponentialBackOffStrategy(2); } - /** - * Set instagram client - * @param instagramClient - */ - protected void setInstagramClient(Instagram instagramClient) { - this.instagramClient = instagramClient; + + @VisibleForTesting + protected Instagram getNextInstagramClient() { + return new Instagram(this.tokenManger.getNextAvailableToken().getClientId()); } - /** - * Gets the user ids from the {@link org.apache.streams.instagram.InstagramUserInformationConfiguration} and - * converts them to {@link java.lang.Long} - * @return - */ - protected Set getUserIds() { - Set userIds = Sets.newHashSet(); - for(String id : config.getUserIds()) { - try { - userIds.add(Long.parseLong(id)); - } catch (NumberFormatException nfe) { - LOGGER.error("Failed to parse user id, {}, to a long : {}", id, nfe.getMessage()); + private void queueData(MediaFeed userFeed, String userId) { + if (userFeed == null) { + LOGGER.error("User id, {}, returned a NULL media feed from instagram.", userId); + } else { + for (MediaFeedData data : userFeed.getData()) { + this.dataQueue.offer(data); } } - return userIds; } /** - * Determins the course of action to take when Instagram returns an exception to a request. If it is a rate limit - * exception, it implements an exponentional back off strategy. If it is anyother exception, it is logged and - * rethrown. - * @param instaExec exception to handle - * @param attempt number of attempts that have occured to pull this users information - * @throws InstagramException + * @return true when the collector has queued all of the available media feed data for the provided users. */ - protected void handleInstagramException(InstagramException instaExec, int attempt) throws InstagramException { - LOGGER.debug("RemainingApiLimitStatus: {}", instaExec.getRemainingLimitStatus()); - if(instaExec.getRemainingLimitStatus() == 0) { //rate limit exception - long sleepTime = Math.round(Math.pow(SLEEP_SECS, attempt)) * 1000; - try { - LOGGER.debug("Encountered rate limit exception, sleeping for {} ms", sleepTime); - Thread.sleep(sleepTime); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); + public boolean isCompleted() { + return this.isCompleted.get(); + } + + @Override + public void run() { + try { + for (UserId user : this.config.getUsersInfo().getUserIds()) { + collectMediaFeed(user); } - } else { - LOGGER.error("Instagram returned an excetpion to the user media request : {}", instaExec.getMessage()); - throw instaExec; + } catch (Exception e) { + LOGGER.error("Shutting down InstagramCollector. Exception occured: {}", e.getMessage()); } + this.isCompleted.set(true); } /** - * Gets the MediaFeedData for this particular user and adds it to the share queued. - * @param userId + * Pull Recement Media for a user and queues the resulting data. Will try a single call 5 times before failing and + * moving on to the next call or returning. + * @param user + * @throws Exception */ - private void getUserMedia(Long userId) { - MediaFeed feed = null; - int attempts = 0; - int count = 0; + @VisibleForTesting + protected void collectMediaFeed(UserId user) throws Exception { + Pagination pagination = null; do { - ++attempts; - try { - feed = this.instagramClient.getRecentMediaFeed(userId); - queueData(feed, userId); - count += feed.getData().size(); - while(feed != null && feed.getPagination() != null && feed.getPagination().hasNextPage()) { - feed = this.instagramClient.getRecentMediaNextPage(feed.getPagination()); - queueData(feed, userId); - count += feed.getData().size(); - } - } catch (InstagramException ie) { + int attempts = 0; + boolean succesfullDataPull = false; + while (!succesfullDataPull && attempts < MAX_ATTEMPTS) { + ++attempts; + MediaFeed feed = null; try { - handleInstagramException(ie, attempts); - } catch (InstagramException ie2) { //not a rate limit exception, ignore user - attempts = MAX_ATTEMPTS; - } - } - } while(feed == null && attempts < MAX_ATTEMPTS); - LOGGER.debug("For user, {}, received {} MediaFeedData", userId, count); - } - - private void queueData(MediaFeed userFeed, Long userId) { - if(userFeed == null) { - LOGGER.error("User id, {}, returned a NULL media feed from instagram.", userId); - } else { - for(MediaFeedData data : userFeed.getData()) { - synchronized (this.dataQueue) { //unnecessary - while(!this.dataQueue.offer(data)) { - Thread.yield(); + if (pagination == null) { + feed = getNextInstagramClient().getRecentMediaFeed(Long.valueOf(user.getUserId()), + 0, + null, + null, + user.getBeforeDate() == null ? null : user.getBeforeDate().toDate(), + user.getAfterDate() == null ? null : user.getAfterDate().toDate()); + } else { + feed = getNextInstagramClient().getRecentMediaNextPage(pagination); + } + } catch (Exception e) { + handleException(e); + if(e instanceof InstagramBadRequestException) { + attempts = MAX_ATTEMPTS; //don't repeat bad requests. } + if(this.consecutiveErrorCount > Math.max(this.tokenManger.numAvailableTokens(), MAX_ATTEMPTS*2)) { + throw new Exception("InstagramCollector failed to successfully connect to instagram on "+this.consecutiveErrorCount+" attempts."); + } + } + if(succesfullDataPull = feed != null) { + this.consecutiveErrorCount = 0; + this.backOffStrategy.reset(); + pagination = feed.getPagination(); + queueData(feed, user.getUserId()); } } - } + if(!succesfullDataPull) { + LOGGER.error("Failed to get data from instagram for user id, {}, skipping user.", user.getUserId()); + } + } while (pagination != null && pagination.hasNextPage()); } /** - * - * @return true when the collector has queued all of available media feed data for the provided users. + * Handles different types of {@link java.lang.Exception} caught while trying to pull Instagram data. + * BackOffs/Sleeps when it encounters a rate limit expection.. + * @param e + * @throws BackOffException */ - public boolean isCompleted() { - return this.isCompleted.get(); - } - - @Override - public void run() { - for(Long userId : getUserIds()) { - getUserMedia(userId); + protected void handleException(Exception e) throws BackOffException { + if(e instanceof InstagramRateLimitException) { + LOGGER.warn("Received rate limit exception from Instagram, backing off. : {}", e); + this.backOffStrategy.backOff(); + } else if(e instanceof InstagramBadRequestException) { + LOGGER.error("Received Bad Requests exception form Instagram: {}", e); + ++this.consecutiveErrorCount; + } else { + LOGGER.error("Received Expection while attempting to poll Instagram: {}", e); + ++this.consecutiveErrorCount; } - this.isCompleted.set(true); } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9f296be7/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java index 30ddda4..e3e9399 100644 --- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java +++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java @@ -14,17 +14,19 @@ specific language governing permissions and limitations under the License. */ package org.apache.streams.instagram.provider; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Queues; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.instagram.InstagramConfigurator; -import org.apache.streams.instagram.InstagramUserInformationConfiguration; +import org.apache.streams.instagram.*; +import org.apache.streams.util.SerializationUtil; import org.jinstagram.entity.users.feed.MediaFeedData; import org.joda.time.DateTime; import java.math.BigInteger; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -36,17 +38,17 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class InstagramRecentMediaProvider implements StreamsProvider { - private InstagramUserInformationConfiguration config; + private InstagramConfiguration config; private InstagramRecentMediaCollector dataCollector; protected Queue mediaFeedQueue; //exposed for testing private ExecutorService executorService; private AtomicBoolean isCompleted; public InstagramRecentMediaProvider() { - this(InstagramConfigurator.detectInstagramUserInformationConfiguration(StreamsConfigurator.config.getConfig("instagram"))); + this(InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram"))); } - public InstagramRecentMediaProvider(InstagramUserInformationConfiguration config) { + public InstagramRecentMediaProvider(InstagramConfiguration config) { this.config = config; this.mediaFeedQueue = Queues.newConcurrentLinkedQueue(); } @@ -62,6 +64,7 @@ public class InstagramRecentMediaProvider implements StreamsProvider { * EXPOSED FOR TESTING * @return */ + @VisibleForTesting protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() { return new InstagramRecentMediaCollector(this.mediaFeedQueue, this.config); } @@ -112,4 +115,26 @@ public class InstagramRecentMediaProvider implements StreamsProvider { this.executorService = null; } } + + /** + * Add default start and stop points if necessary. + */ + private void updateUserInfoList() { + UsersInfo usersInfo = this.config.getUsersInfo(); + if(usersInfo.getDefaultAfterDate() == null && usersInfo.getDefaultBeforeDate() == null) { + return; + } + DateTime defaultAfterDate = usersInfo.getDefaultAfterDate(); + DateTime defaultBeforeDate = usersInfo.getDefaultBeforeDate(); + for(UserId user : usersInfo.getUserIds()) { + if(defaultAfterDate != null && user.getAfterDate() == null) { + user.setAfterDate(defaultAfterDate); + } + if(defaultBeforeDate != null && user.getBeforeDate() == null) { + user.setBeforeDate(defaultBeforeDate); + } + } + } + + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9f296be7/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json index 7b894a2..c662660 100644 --- a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json +++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json @@ -5,9 +5,13 @@ "javaType" : "org.apache.streams.instagram.InstagramConfiguration", "javaInterfaces": ["java.io.Serializable"], "properties": { - "clientId": { - "type": "string", - "description": "Your Instagram Client Id" + "clientIds": { + "type": "array", + "uniqueItems": true, + "items": { + "type": "string" + }, + "description": "Your Instagram Client Ids" }, "usersInfo": { "type": "object", @@ -23,12 +27,12 @@ }, "defaultAfterDate": { "type": "string", - "format": "datetime", + "format": "date-time", "description": "If the api allows to gather data by date range, this date will be used as the start of the range for the request for all users that don't have date ranges specified. If this is null it will pull from the earliest possible time" }, "defaultBeforeDate": { "type": "string", - "format": "datetime", + "format": "date-time", "description": "If the api allows to gather data by date range, this date will be used as the end of the range for the request for all users that don't have date ranges specified. If this is null it will pull till current time." } } @@ -44,12 +48,12 @@ }, "afterDate": { "type": "string", - "format": "datetime", + "format": "date-time", "description": "If the api allows to gather data by date range, this date will be used as the start of the range for the request for this user. If this is null it will use the defaultBeforeDate." }, "beforeDate": { "type": "string", - "format": "datetime", + "format": "date-time", "description": "If the api allows to gather data by date range, this date will be used as the end of the range for the request for this user.. If this is null it will use the defaultAfterDate." } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9f296be7/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java index 0225b40..74b5139 100644 --- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java +++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java @@ -14,16 +14,25 @@ specific language governing permissions and limitations under the License. */ package org.apache.streams.instagram.provider; +import com.carrotsearch.randomizedtesting.RandomizedTest; +import com.carrotsearch.randomizedtesting.annotations.Repeat; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Queues; import com.google.common.collect.Sets; +import org.apache.streams.instagram.InstagramConfiguration; import org.apache.streams.instagram.InstagramUserInformationConfiguration; +import org.apache.streams.instagram.UserId; +import org.apache.streams.instagram.UsersInfo; +import org.apache.streams.util.ConcurentYieldTillSuccessQueue; import org.jinstagram.Instagram; import org.jinstagram.entity.common.Pagination; import org.jinstagram.entity.users.feed.MediaFeed; import org.jinstagram.entity.users.feed.MediaFeedData; +import org.jinstagram.exceptions.InstagramBadRequestException; import org.jinstagram.exceptions.InstagramException; +import org.jinstagram.exceptions.InstagramRateLimitException; +import org.joda.time.DateTime; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -34,91 +43,63 @@ import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import static org.junit.Assert.*; -import static org.mockito.Matchers.any; +import static org.mockito.Matchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; /** * Tests for {@link org.apache.streams.instagram.provider.InstagramRecentMediaCollector} */ -public class InstagramRecentMediaCollectorTest { +public class InstagramRecentMediaCollectorTest extends RandomizedTest { private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollectorTest.class); private int expectedDataCount = 0; - private long randomSeed = System.currentTimeMillis(); - private Random rand = new Random(randomSeed); - private Map pageMap = Maps.newHashMap(); @Test public void testHandleInstagramException1() throws InstagramException { - InstagramException ie = mock(InstagramException.class); + InstagramException ie = mock(InstagramRateLimitException.class); when(ie.getRemainingLimitStatus()).thenReturn(1); final String message = "Test Message"; when(ie.getMessage()).thenReturn(message); - InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue(), new InstagramUserInformationConfiguration()); + InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurentYieldTillSuccessQueue(), new InstagramConfiguration()); try { - collector.handleInstagramException(ie, 1); - fail("Expected RuntimeException to be thrown"); - } catch (InstagramException rte) { -// assertTrue(rte.getMessage().contains("Mock for InstagramException")); - assertEquals(message, rte.getMessage()); + long startTime = System.currentTimeMillis(); + collector.handleException(ie); + long endTime = System.currentTimeMillis(); + assertTrue(2000 <= endTime - startTime); + startTime = System.currentTimeMillis(); + collector.handleException(ie); + endTime = System.currentTimeMillis(); + assertTrue(4000 <= endTime - startTime); + } catch (Exception e) { + fail("Should not have thrown an exception."); } } - @Test - public void testHandleInstagramException2() throws InstagramException{ - InstagramException ie = mock(InstagramException.class); - when(ie.getRemainingLimitStatus()).thenReturn(0); - InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue(), new InstagramUserInformationConfiguration()); - long startTime = System.currentTimeMillis(); - collector.handleInstagramException(ie, 1); - long endTime = System.currentTimeMillis(); - LOGGER.debug("Slept for {} ms", startTime - endTime); - assertTrue(endTime - startTime >= 4000); //allow for 1 sec of error - startTime = System.currentTimeMillis(); - collector.handleInstagramException(ie, 2); - endTime = System.currentTimeMillis(); - LOGGER.debug("Slept for {} ms", startTime - endTime); - assertTrue(endTime - startTime >= 24000); //allow for 1 sec of error - } - - @Test - public void testGetUserIds() { - InstagramUserInformationConfiguration config = new InstagramUserInformationConfiguration(); - List userIds = Lists.newLinkedList(); - userIds.add("1"); - userIds.add("2"); - userIds.add("3"); - userIds.add("4"); - userIds.add("abcdefg"); - config.setUserIds(userIds); - InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue(), config); - - Set expected = Sets.newHashSet(); - expected.add(1L); - expected.add(2L); - expected.add(3L); - expected.add(4L); - - assertEquals(expected, collector.getUserIds()); - } @Test + @Repeat(iterations = 3) public void testRun() { - Queue data = Queues.newConcurrentLinkedQueue(); - InstagramUserInformationConfiguration config = new InstagramUserInformationConfiguration(); - List userIds = Lists.newLinkedList(); - userIds.add("1"); - userIds.add("2"); - userIds.add("3"); - userIds.add("4"); - config.setUserIds(userIds); - InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(data, config); - collector.setInstagramClient(createMockInstagramClient()); + this.expectedDataCount = 0; + Queue data = new ConcurentYieldTillSuccessQueue(); + InstagramConfiguration config = new InstagramConfiguration(); + UsersInfo usersInfo = new UsersInfo(); + config.setUsersInfo(usersInfo); + Set users = creatUsers(randomIntBetween(0, 100)); + usersInfo.setUserIds(users); + + final Instagram mockInstagram = createMockInstagramClient(); + InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(data, config) { + @Override + protected Instagram getNextInstagramClient() { + return mockInstagram; + } + }; + assertFalse(collector.isCompleted()); collector.run(); - LOGGER.debug("Random seed == {}", randomSeed); - assertEquals("Random Seed == " + randomSeed, this.expectedDataCount, data.size()); + assertTrue(collector.isCompleted()); + assertEquals(this.expectedDataCount, data.size()); } private Instagram createMockInstagramClient() { @@ -127,12 +108,19 @@ public class InstagramRecentMediaCollectorTest { final InstagramException mockException = mock(InstagramException.class); when(mockException.getRemainingLimitStatus()).thenReturn(-1); when(mockException.getMessage()).thenReturn("MockInstagramException message"); - when(instagramClient.getRecentMediaFeed(any(Long.class))).thenAnswer(new Answer() { + when(instagramClient.getRecentMediaFeed(anyLong(), anyInt(), anyString(), anyString(), any(Date.class), any(Date.class))).thenAnswer(new Answer() { @Override public MediaFeed answer(InvocationOnMock invocationOnMock) throws Throwable { - long param = (Long) invocationOnMock.getArguments()[0]; - if (param == 2L) { - throw mockException; + if (randomInt(20) == 0) { //5% throw exceptions + int type = randomInt(4); + if (type == 0) + throw mock(InstagramRateLimitException.class); + else if (type == 1) + throw mock(InstagramBadRequestException.class); + else if (type == 2) + throw mock(InstagramException.class); + else + throw new Exception(); } else { return createRandomMockMediaFeed(); } @@ -150,11 +138,27 @@ public class InstagramRecentMediaCollectorTest { return instagramClient; } + private Set creatUsers(int numUsers) { + Set users = Sets.newHashSet(); + for(int i=0; i < numUsers; ++i) { + UserId user = new UserId(); + user.setUserId(Integer.toString(randomInt())); + if(randomInt(2) == 0) { + user.setAfterDate(DateTime.now().minusSeconds(randomIntBetween(0, 1000))); + } + if(randomInt(2) == 0) { + user.setBeforeDate(DateTime.now()); + } + users.add(user); + } + return users; + } + private MediaFeed createRandomMockMediaFeed() throws InstagramException { MediaFeed feed = mock(MediaFeed.class); - when(feed.getData()).thenReturn(createData(this.rand.nextInt(100))); + when(feed.getData()).thenReturn(createData(randomInt(100))); Pagination pagination = mock(Pagination.class); - if(this.rand.nextInt(2) == 0) { + if(randomInt(2) == 0) { when(pagination.hasNextPage()).thenReturn(true); } else { when(pagination.hasNextPage()).thenReturn(false); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9f296be7/streams-util/src/main/java/org/apache/streams/util/ConcurentYieldTillSuccessQueue.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/ConcurentYieldTillSuccessQueue.java b/streams-util/src/main/java/org/apache/streams/util/ConcurentYieldTillSuccessQueue.java new file mode 100644 index 0000000..e438f54 --- /dev/null +++ b/streams-util/src/main/java/org/apache/streams/util/ConcurentYieldTillSuccessQueue.java @@ -0,0 +1,27 @@ +package org.apache.streams.util; + +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * A {@link java.util.concurrent.ConcurrentLinkedQueue} implementation that causes thread yields when data is not + * successfully offered or polled. + */ +public class ConcurentYieldTillSuccessQueue extends ConcurrentLinkedQueue { + + @Override + public T poll() { + T item = null; + while(!super.isEmpty() && (item = super.poll()) == null) { + Thread.yield();; + } + return item; + } + + @Override + public boolean offer(T t) { + while(!super.offer(t)) { + Thread.yield(); + } + return true; + } +}