STREAMS-145 | Created abstarct collectors/providers to instagram
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0df06b60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0df06b60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0df06b60
Branch: refs/heads/pp
Commit: 0df06b6095b26531d58c0f12b9eaa0abe5ee54b5
Parents: a01b691
Author: Ryan Ebanks <rebanks@Informations-MacBook-Pro-3.local>
Authored: Thu Aug 14 11:21:51 2014 -0500
Committer: Ryan Ebanks <rebanks@Informations-MacBook-Pro-3.local>
Committed: Thu Aug 14 11:21:51 2014 -0500
----------------------------------------------------------------------
.../provider/InstagramAbstractProvider.java | 195 ++++++++++++++++++
.../provider/InstagramDataCollector.java | 129 ++++++++++++
.../provider/InstagramRecentMediaCollector.java | 171 ----------------
.../provider/InstagramRecentMediaProvider.java | 201 -------------------
.../InstagramRecentMediaCollector.java | 172 ++++++++++++++++
.../InstagramRecentMediaProvider.java | 201 +++++++++++++++++++
.../userinfo/InstagramUserInfoProvider.java | 82 ++++++++
.../InstagramRecentMediaCollectorTest.java | 159 ---------------
.../InstagramRecentMediaProviderTest.java | 175 ----------------
.../InstagramRecentMediaCollectorTest.java | 155 ++++++++++++++
.../InstagramRecentMediaProviderTest.java | 172 ++++++++++++++++
.../apache/streams/util/SerializationUtil.java | 4 +-
12 files changed, 1108 insertions(+), 708 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
new file mode 100644
index 0000000..3d35714
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
@@ -0,0 +1,195 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+package org.apache.streams.instagram.provider;
+
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.InstagramConfigurator;
+import org.apache.streams.instagram.User;
+import org.apache.streams.instagram.UsersInfo;
+import org.apache.streams.util.ComponentUtils;
+import org.apache.streams.util.SerializationUtil;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Instagram {@link org.apache.streams.core.StreamsProvider} that provides the recent media data for a group of users
+ */
+public abstract class InstagramAbstractProvider<T> implements StreamsProvider {
+
+ private InstagramConfiguration config;
+ private InstagramDataCollector dataCollector;
+ protected Queue<StreamsDatum> dataQueue; //exposed for testing
+ private ExecutorService executorService;
+ private AtomicBoolean isCompleted;
+
+ public InstagramAbstractProvider() {
+ this.config = InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram"));
+ }
+
+ public InstagramAbstractProvider(InstagramConfiguration config) {
+ this.config = (InstagramConfiguration) SerializationUtil.cloneBySerialization(config);
+ }
+
+ @Override
+ public void startStream() {
+ this.dataCollector = getInstagramDataCollector();
+ this.executorService = Executors.newSingleThreadExecutor();
+ this.executorService.submit(this.dataCollector);
+ }
+
+ /**
+ * Return the data collector to use to connect to instagram.
+ * @return
+ */
+ protected abstract InstagramDataCollector getInstagramDataCollector();
+
+
+ @Override
+ public StreamsResultSet readCurrent() {
+ Queue<StreamsDatum> batch = Queues.newConcurrentLinkedQueue();
+ while(!this.dataQueue.isEmpty()) {
+ ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.dataQueue), batch);
+ }
+ this.isCompleted.set(batch.size() == 0 && this.dataQueue.isEmpty() && this.dataCollector.isCompleted());
+ return new StreamsResultSet(batch);
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return !this.isCompleted.get();
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ this.dataQueue = Queues.newConcurrentLinkedQueue();
+ this.isCompleted = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void cleanUp() {
+ this.executorService.shutdown();
+ try {
+ this.executorService.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } finally {
+ this.executorService = null;
+ }
+ }
+
+ /**
+ * Add default start and stop points if necessary.
+ */
+ private void updateUserInfoList() {
+ UsersInfo usersInfo = this.config.getUsersInfo();
+ if(usersInfo.getDefaultAfterDate() == null && usersInfo.getDefaultBeforeDate() == null) {
+ return;
+ }
+ DateTime defaultAfterDate = usersInfo.getDefaultAfterDate();
+ DateTime defaultBeforeDate = usersInfo.getDefaultBeforeDate();
+ for(User user : usersInfo.getUsers()) {
+ if(defaultAfterDate != null && user.getAfterDate() == null) {
+ user.setAfterDate(defaultAfterDate);
+ }
+ if(defaultBeforeDate != null && user.getBeforeDate() == null) {
+ user.setBeforeDate(defaultBeforeDate);
+ }
+ }
+ }
+
+ /**
+ * Overrides the client id in the configuration.
+ * @param clientId client id to use
+ */
+ public void setInstagramClientId(String clientId) {
+ this.config.setClientId(clientId);
+ }
+
+ /**
+ * Overrides authroized user tokens in the configuration.
+ * @param tokenStrings
+ */
+ public void setAuthorizedUserTokens(Collection<String> tokenStrings) {
+ ensureUsersInfo(this.config).setAuthorizedTokens(Sets.newHashSet(tokenStrings));
+ }
+
+ /**
+ * Overrides the default before date in the configuration
+ * @param beforeDate
+ */
+ public void setDefaultBeforeDate(DateTime beforeDate) {
+ ensureUsersInfo(this.config).setDefaultBeforeDate(beforeDate);
+ }
+
+ /**
+ * Overrides the default after date in the configuration
+ * @param afterDate
+ */
+ public void setDefaultAfterDate(DateTime afterDate) {
+ ensureUsersInfo(this.config).setDefaultAfterDate(afterDate);
+ }
+
+ /**
+ * Overrides the users in the configuration and sets the after date for each user. A NULL DateTime implies
+ * pull data from as early as possible. If default before or after DateTimes are set, they will applied to all
+ * NULL DateTimes.
+ * @param usersWithAfterDate instagram user id mapped to BeforeDate time
+ */
+ public void setUsersWithAfterDate(Map<String, DateTime> usersWithAfterDate) {
+ Set<User> users = Sets.newHashSet();
+ for(String userId : usersWithAfterDate.keySet()) {
+ User user = new User();
+ user.setUserId(userId);
+ user.setAfterDate(usersWithAfterDate.get(userId));
+ users.add(user);
+ }
+ ensureUsersInfo(this.config).setUsers(users);
+ }
+
+ private UsersInfo ensureUsersInfo(InstagramConfiguration config) {
+ UsersInfo usersInfo = config.getUsersInfo();
+ if(usersInfo == null) {
+ usersInfo = new UsersInfo();
+ config.setUsersInfo(usersInfo);
+ }
+ return usersInfo;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java
new file mode 100644
index 0000000..4cfc282
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java
@@ -0,0 +1,129 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+package org.apache.streams.instagram.provider;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.User;
+import org.apache.streams.util.ComponentUtils;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
+import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
+import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger;
+import org.jinstagram.Instagram;
+import org.jinstagram.entity.common.Pagination;
+import org.jinstagram.entity.users.feed.MediaFeed;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.jinstagram.exceptions.InstagramBadRequestException;
+import org.jinstagram.exceptions.InstagramRateLimitException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Executes on all of the Instagram requests to collect the media feed data.
+ * <p/>
+ * If errors/exceptions occur when trying to gather data for a particular user, that user is skipped and the collector
+ * move on to the next user. If a rate limit exception occurs it employs an exponential back off strategy.
+ */
+public abstract class InstagramDataCollector<T> implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InstagramDataCollector.class);
+
+ protected Queue<StreamsDatum> dataQueue; //exposed for testing
+ private InstagramConfiguration config;
+ private AtomicBoolean isCompleted;
+ private SimpleTokenManager<InstagramOauthToken> tokenManger;
+ protected int consecutiveErrorCount;
+ protected BackOffStrategy backOffStrategy;
+ private Instagram instagram;
+
+
+ public InstagramDataCollector(Queue<StreamsDatum> queue, InstagramConfiguration config) {
+ this.dataQueue = queue;
+ this.config = config;
+ this.isCompleted = new AtomicBoolean(false);
+ this.tokenManger = new BasicTokenManger<InstagramOauthToken>();
+ for (String tokens : this.config.getUsersInfo().getAuthorizedTokens()) {
+ this.tokenManger.addTokenToPool(new InstagramOauthToken(tokens));
+ }
+ this.consecutiveErrorCount = 0;
+ this.backOffStrategy = new ExponentialBackOffStrategy(2);
+ this.instagram = new Instagram(this.config.getClientId());
+ }
+
+
+ /**
+ * If there are authorized tokens available, it sets a new token for the client and returns
+ * the client. If there are no available tokens, it simply returns the client that was
+ * initialized in the constructor with client id.
+ * @return
+ */
+ protected Instagram getNextInstagramClient() {
+ if(this.tokenManger.numAvailableTokens() > 0) {
+ this.instagram.setAccessToken(this.tokenManger.getNextAvailableToken());
+ }
+ return this.instagram;
+ }
+
+ protected void queueData(Collection<T> userData, String userId) {
+ if (userData == null) {
+ LOGGER.warn("User id, {}, returned a NULL data from instagram.", userId);
+ } else {
+ for (T data : userData) {
+ ComponentUtils.offerUntilSuccess(convertToStreamsDatum(data), this.dataQueue);
+ }
+ }
+ }
+
+ /**
+ * @return true when the collector has queued all of the available media feed data for the provided users.
+ */
+ public boolean isCompleted() {
+ return this.isCompleted.get();
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (User user : this.config.getUsersInfo().getUsers()) {
+ collectInstagramDataForUser(user);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Shutting down InstagramCollector. Exception occured: {}", e.getMessage());
+ }
+ this.isCompleted.set(true);
+ }
+
+ /**
+ * Pull instagram data for a user and queues the resulting data.
+ * @param user
+ * @throws Exception
+ */
+ protected abstract void collectInstagramDataForUser(User user);
+
+ /**
+ * Takes an Instagram Object and sets it as the document of a streams datum and sets the id of the streams datum.
+ * @param item
+ * @return
+ */
+ protected abstract StreamsDatum convertToStreamsDatum(T item);
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
deleted file mode 100644
index 8dc7a82..0000000
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance *
-http://www.apache.org/licenses/LICENSE-2.0 *
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License. */
-package org.apache.streams.instagram.provider;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.streams.instagram.InstagramConfiguration;
-import org.apache.streams.instagram.User;
-import org.apache.streams.util.ComponentUtils;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
-import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
-import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger;
-import org.jinstagram.Instagram;
-import org.jinstagram.entity.common.Pagination;
-import org.jinstagram.entity.users.feed.MediaFeed;
-import org.jinstagram.entity.users.feed.MediaFeedData;
-import org.jinstagram.exceptions.InstagramBadRequestException;
-import org.jinstagram.exceptions.InstagramRateLimitException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Executes on all of the Instagram requests to collect the media feed data.
- * <p/>
- * If errors/exceptions occur when trying to gather data for a particular user, that user is skipped and the collector
- * move on to the next user. If a rate limit exception occurs it employs an exponential back off strategy.
- */
-public class InstagramRecentMediaCollector implements Runnable {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollector.class);
- protected static final int MAX_ATTEMPTS = 5;
- protected static final int SLEEP_SECS = 5; //5 seconds
-
- protected Queue<MediaFeedData> dataQueue; //exposed for testing
- private InstagramConfiguration config;
- private AtomicBoolean isCompleted;
- private SimpleTokenManager<InstagramOauthToken> tokenManger;
- private int consecutiveErrorCount;
- private BackOffStrategy backOffStrategy;
- private Instagram instagram;
-
-
- public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramConfiguration config) {
- this.dataQueue = queue;
- this.config = config;
- this.isCompleted = new AtomicBoolean(false);
- this.tokenManger = new BasicTokenManger<InstagramOauthToken>();
- for (String tokens : this.config.getUsersInfo().getAuthorizedTokens()) {
- this.tokenManger.addTokenToPool(new InstagramOauthToken(tokens));
- }
- this.consecutiveErrorCount = 0;
- this.backOffStrategy = new ExponentialBackOffStrategy(2);
- this.instagram = new Instagram(this.config.getClientId());
- }
-
-
- /**
- * If there are authorized tokens available, it sets a new token for the client and returns
- * the client. If there are no available tokens, it simply returns the client that was
- * initialized in the constructor with client id.
- * @return
- */
- @VisibleForTesting
- protected Instagram getNextInstagramClient() {
- if(this.tokenManger.numAvailableTokens() > 0) {
- this.instagram.setAccessToken(this.tokenManger.getNextAvailableToken());
- }
- return this.instagram;
- }
-
- private void queueData(MediaFeed userFeed, String userId) {
- if (userFeed == null) {
- LOGGER.warn("User id, {}, returned a NULL media feed from instagram.", userId);
- } else {
- for (MediaFeedData data : userFeed.getData()) {
- ComponentUtils.offerUntilSuccess(data, this.dataQueue);
- }
- }
- }
-
- /**
- * @return true when the collector has queued all of the available media feed data for the provided users.
- */
- public boolean isCompleted() {
- return this.isCompleted.get();
- }
-
- @Override
- public void run() {
- try {
- for (User user : this.config.getUsersInfo().getUsers()) {
- collectMediaFeed(user);
- }
- } catch (Exception e) {
- LOGGER.error("Shutting down InstagramCollector. Exception occured: {}", e.getMessage());
- }
- this.isCompleted.set(true);
- }
-
- /**
- * Pull Recement Media for a user and queues the resulting data. Will try a single call 5 times before failing and
- * moving on to the next call or returning.
- * @param user
- * @throws Exception
- */
- @VisibleForTesting
- protected void collectMediaFeed(User user) throws Exception {
- Pagination pagination = null;
- do {
- int attempts = 0;
- boolean succesfullDataPull = false;
- while (!succesfullDataPull && attempts < MAX_ATTEMPTS) {
- ++attempts;
- MediaFeed feed = null;
- try {
- if (pagination == null) {
- feed = getNextInstagramClient().getRecentMediaFeed(Long.valueOf(user.getUserId()),
- 0,
- null,
- null,
- user.getBeforeDate() == null ? null : user.getBeforeDate().toDate(),
- user.getAfterDate() == null ? null : user.getAfterDate().toDate());
- } else {
- feed = getNextInstagramClient().getRecentMediaNextPage(pagination);
- }
- } catch (Exception e) {
- if(e instanceof InstagramRateLimitException) {
- LOGGER.warn("Received rate limit exception from Instagram, backing off. : {}", e);
- this.backOffStrategy.backOff();
- } else if(e instanceof InstagramBadRequestException) {
- LOGGER.error("Received Bad Requests exception form Instagram: {}", e);
- attempts = MAX_ATTEMPTS; //don't repeat bad requests.
- ++this.consecutiveErrorCount;
- } else {
- LOGGER.error("Received Expection while attempting to poll Instagram: {}", e);
- ++this.consecutiveErrorCount;
- }
- if(this.consecutiveErrorCount > Math.max(this.tokenManger.numAvailableTokens(), MAX_ATTEMPTS*2)) {
- throw new Exception("InstagramCollector failed to successfully connect to instagram on "+this.consecutiveErrorCount+" attempts.");
- }
- }
- if(succesfullDataPull = feed != null) {
- this.consecutiveErrorCount = 0;
- this.backOffStrategy.reset();
- pagination = feed.getPagination();
- queueData(feed, user.getUserId());
- }
- }
- if(!succesfullDataPull) {
- LOGGER.error("Failed to get data from instagram for user id, {}, skipping user.", user.getUserId());
- }
- } while (pagination != null && pagination.hasNextPage());
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
deleted file mode 100644
index 874fd19..0000000
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance *
-http://www.apache.org/licenses/LICENSE-2.0 *
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License. */
-package org.apache.streams.instagram.provider;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.instagram.*;
-import org.apache.streams.util.ComponentUtils;
-import org.apache.streams.util.SerializationUtil;
-import org.jinstagram.auth.model.Token;
-import org.jinstagram.entity.users.feed.MediaFeedData;
-import org.joda.time.DateTime;
-
-import java.math.BigInteger;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Instagram {@link org.apache.streams.core.StreamsProvider} that provides the recent media data for a group of users
- */
-public class InstagramRecentMediaProvider implements StreamsProvider {
-
- private InstagramConfiguration config;
- private InstagramRecentMediaCollector dataCollector;
- protected Queue<MediaFeedData> mediaFeedQueue; //exposed for testing
- private ExecutorService executorService;
- private AtomicBoolean isCompleted;
-
- public InstagramRecentMediaProvider() {
- this.config = InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram"));
- }
-
- public InstagramRecentMediaProvider(InstagramConfiguration config) {
- this.config = (InstagramConfiguration) SerializationUtil.cloneBySerialization(config);
- }
-
- @Override
- public void startStream() {
- this.dataCollector = getInstagramRecentMediaCollector();
- this.executorService = Executors.newSingleThreadExecutor();
- this.executorService.submit(this.dataCollector);
- }
-
- /**
- * EXPOSED FOR TESTING
- * @return
- */
- @VisibleForTesting
- protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
- this.updateUserInfoList();
- return new InstagramRecentMediaCollector(this.mediaFeedQueue, this.config);
- }
-
-
- @Override
- public StreamsResultSet readCurrent() {
- Queue<StreamsDatum> batch = Queues.newConcurrentLinkedQueue();
- MediaFeedData data = null;
- while(!this.mediaFeedQueue.isEmpty()) {
- data = ComponentUtils.pollWhileNotEmpty(this.mediaFeedQueue);
- batch.add(new StreamsDatum(data, data.getId()));
- }
- this.isCompleted.set(batch.size() == 0 && this.mediaFeedQueue.isEmpty() && this.dataCollector.isCompleted());
- return new StreamsResultSet(batch);
- }
-
- @Override
- public StreamsResultSet readNew(BigInteger sequence) {
- return null;
- }
-
- @Override
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- return null;
- }
-
- @Override
- public boolean isRunning() {
- return !this.isCompleted.get();
- }
-
- @Override
- public void prepare(Object configurationObject) {
- this.mediaFeedQueue = Queues.newConcurrentLinkedQueue();
- this.isCompleted = new AtomicBoolean(false);
- }
-
- @Override
- public void cleanUp() {
- this.executorService.shutdown();
- try {
- this.executorService.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- } finally {
- this.executorService = null;
- }
- }
-
- /**
- * Add default start and stop points if necessary.
- */
- private void updateUserInfoList() {
- UsersInfo usersInfo = this.config.getUsersInfo();
- if(usersInfo.getDefaultAfterDate() == null && usersInfo.getDefaultBeforeDate() == null) {
- return;
- }
- DateTime defaultAfterDate = usersInfo.getDefaultAfterDate();
- DateTime defaultBeforeDate = usersInfo.getDefaultBeforeDate();
- for(User user : usersInfo.getUsers()) {
- if(defaultAfterDate != null && user.getAfterDate() == null) {
- user.setAfterDate(defaultAfterDate);
- }
- if(defaultBeforeDate != null && user.getBeforeDate() == null) {
- user.setBeforeDate(defaultBeforeDate);
- }
- }
- }
-
- /**
- * Overrides the client id in the configuration.
- * @param clientId client id to use
- */
- public void setInstagramClientId(String clientId) {
- this.config.setClientId(clientId);
- }
-
- /**
- * Overrides authroized user tokens in the configuration.
- * @param tokenStrings
- */
- public void setAuthorizedUserTokens(Collection<String> tokenStrings) {
- ensureUsersInfo(this.config).setAuthorizedTokens(Sets.newHashSet(tokenStrings));
- }
-
- /**
- * Overrides the default before date in the configuration
- * @param beforeDate
- */
- public void setDefaultBeforeDate(DateTime beforeDate) {
- ensureUsersInfo(this.config).setDefaultBeforeDate(beforeDate);
- }
-
- /**
- * Overrides the default after date in the configuration
- * @param afterDate
- */
- public void setDefaultAfterDate(DateTime afterDate) {
- ensureUsersInfo(this.config).setDefaultAfterDate(afterDate);
- }
-
- /**
- * Overrides the users in the configuration and sets the after date for each user. A NULL DateTime implies
- * pull data from as early as possible. If default before or after DateTimes are set, they will applied to all
- * NULL DateTimes.
- * @param usersWithAfterDate instagram user id mapped to BeforeDate time
- */
- public void setUsersWithAfterDate(Map<String, DateTime> usersWithAfterDate) {
- Set<User> users = Sets.newHashSet();
- for(String userId : usersWithAfterDate.keySet()) {
- User user = new User();
- user.setUserId(userId);
- user.setAfterDate(usersWithAfterDate.get(userId));
- users.add(user);
- }
- ensureUsersInfo(this.config).setUsers(users);
- }
-
- private UsersInfo ensureUsersInfo(InstagramConfiguration config) {
- UsersInfo usersInfo = config.getUsersInfo();
- if(usersInfo == null) {
- usersInfo = new UsersInfo();
- config.setUsersInfo(usersInfo);
- }
- return usersInfo;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
new file mode 100644
index 0000000..952e3bf
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
@@ -0,0 +1,172 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+package org.apache.streams.instagram.provider.recentmedia;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.User;
+import org.apache.streams.instagram.provider.InstagramOauthToken;
+import org.apache.streams.util.ComponentUtils;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
+import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
+import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger;
+import org.jinstagram.Instagram;
+import org.jinstagram.entity.common.Pagination;
+import org.jinstagram.entity.users.feed.MediaFeed;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.jinstagram.exceptions.InstagramBadRequestException;
+import org.jinstagram.exceptions.InstagramRateLimitException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Executes on all of the Instagram requests to collect the media feed data.
+ * <p/>
+ * If errors/exceptions occur when trying to gather data for a particular user, that user is skipped and the collector
+ * move on to the next user. If a rate limit exception occurs it employs an exponential back off strategy.
+ */
+public class InstagramRecentMediaCollector implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollector.class);
+ protected static final int MAX_ATTEMPTS = 5;
+ protected static final int SLEEP_SECS = 5; //5 seconds
+
+ protected Queue<MediaFeedData> dataQueue; //exposed for testing
+ private InstagramConfiguration config;
+ private AtomicBoolean isCompleted;
+ private SimpleTokenManager<InstagramOauthToken> tokenManger;
+ private int consecutiveErrorCount;
+ private BackOffStrategy backOffStrategy;
+ private Instagram instagram;
+
+
+ public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramConfiguration config) {
+ this.dataQueue = queue;
+ this.config = config;
+ this.isCompleted = new AtomicBoolean(false);
+ this.tokenManger = new BasicTokenManger<InstagramOauthToken>();
+ for (String tokens : this.config.getUsersInfo().getAuthorizedTokens()) {
+ this.tokenManger.addTokenToPool(new InstagramOauthToken(tokens));
+ }
+ this.consecutiveErrorCount = 0;
+ this.backOffStrategy = new ExponentialBackOffStrategy(2);
+ this.instagram = new Instagram(this.config.getClientId());
+ }
+
+
+ /**
+ * If there are authorized tokens available, it sets a new token for the client and returns
+ * the client. If there are no available tokens, it simply returns the client that was
+ * initialized in the constructor with client id.
+ * @return
+ */
+ @VisibleForTesting
+ protected Instagram getNextInstagramClient() {
+ if(this.tokenManger.numAvailableTokens() > 0) {
+ this.instagram.setAccessToken(this.tokenManger.getNextAvailableToken());
+ }
+ return this.instagram;
+ }
+
+ private void queueData(MediaFeed userFeed, String userId) {
+ if (userFeed == null) {
+ LOGGER.warn("User id, {}, returned a NULL media feed from instagram.", userId);
+ } else {
+ for (MediaFeedData data : userFeed.getData()) {
+ ComponentUtils.offerUntilSuccess(data, this.dataQueue);
+ }
+ }
+ }
+
+ /**
+ * @return true when the collector has queued all of the available media feed data for the provided users.
+ */
+ public boolean isCompleted() {
+ return this.isCompleted.get();
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (User user : this.config.getUsersInfo().getUsers()) {
+ collectMediaFeed(user);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Shutting down InstagramCollector. Exception occured: {}", e.getMessage());
+ }
+ this.isCompleted.set(true);
+ }
+
+ /**
+ * Pull Recement Media for a user and queues the resulting data. Will try a single call 5 times before failing and
+ * moving on to the next call or returning.
+ * @param user
+ * @throws Exception
+ */
+ @VisibleForTesting
+ protected void collectMediaFeed(User user) throws Exception {
+ Pagination pagination = null;
+ do {
+ int attempts = 0;
+ boolean succesfullDataPull = false;
+ while (!succesfullDataPull && attempts < MAX_ATTEMPTS) {
+ ++attempts;
+ MediaFeed feed = null;
+ try {
+ if (pagination == null) {
+ feed = getNextInstagramClient().getRecentMediaFeed(Long.valueOf(user.getUserId()),
+ 0,
+ null,
+ null,
+ user.getBeforeDate() == null ? null : user.getBeforeDate().toDate(),
+ user.getAfterDate() == null ? null : user.getAfterDate().toDate());
+ } else {
+ feed = getNextInstagramClient().getRecentMediaNextPage(pagination);
+ }
+ } catch (Exception e) {
+ if(e instanceof InstagramRateLimitException) {
+ LOGGER.warn("Received rate limit exception from Instagram, backing off. : {}", e);
+ this.backOffStrategy.backOff();
+ } else if(e instanceof InstagramBadRequestException) {
+ LOGGER.error("Received Bad Requests exception form Instagram: {}", e);
+ attempts = MAX_ATTEMPTS; //don't repeat bad requests.
+ ++this.consecutiveErrorCount;
+ } else {
+ LOGGER.error("Received Expection while attempting to poll Instagram: {}", e);
+ ++this.consecutiveErrorCount;
+ }
+ if(this.consecutiveErrorCount > Math.max(this.tokenManger.numAvailableTokens(), MAX_ATTEMPTS*2)) {
+ throw new Exception("InstagramCollector failed to successfully connect to instagram on "+this.consecutiveErrorCount+" attempts.");
+ }
+ }
+ if(succesfullDataPull = feed != null) {
+ this.consecutiveErrorCount = 0;
+ this.backOffStrategy.reset();
+ pagination = feed.getPagination();
+ queueData(feed, user.getUserId());
+ }
+ }
+ if(!succesfullDataPull) {
+ LOGGER.error("Failed to get data from instagram for user id, {}, skipping user.", user.getUserId());
+ }
+ } while (pagination != null && pagination.hasNextPage());
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
new file mode 100644
index 0000000..39acbd4
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
@@ -0,0 +1,201 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+package org.apache.streams.instagram.provider.recentmedia;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.instagram.*;
+import org.apache.streams.instagram.provider.recentmedia.InstagramRecentMediaCollector;
+import org.apache.streams.util.ComponentUtils;
+import org.apache.streams.util.SerializationUtil;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Instagram {@link org.apache.streams.core.StreamsProvider} that provides the recent media data for a group of users
+ */
+public class InstagramRecentMediaProvider implements StreamsProvider {
+
+ private InstagramConfiguration config;
+ private InstagramRecentMediaCollector dataCollector;
+ protected Queue<MediaFeedData> mediaFeedQueue; //exposed for testing
+ private ExecutorService executorService;
+ private AtomicBoolean isCompleted;
+
+ public InstagramRecentMediaProvider() {
+ this.config = InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram"));
+ }
+
+ public InstagramRecentMediaProvider(InstagramConfiguration config) {
+ this.config = (InstagramConfiguration) SerializationUtil.cloneBySerialization(config);
+ }
+
+ @Override
+ public void startStream() {
+ this.dataCollector = getInstagramRecentMediaCollector();
+ this.executorService = Executors.newSingleThreadExecutor();
+ this.executorService.submit(this.dataCollector);
+ }
+
+ /**
+ * EXPOSED FOR TESTING
+ * @return
+ */
+ @VisibleForTesting
+ protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
+ this.updateUserInfoList();
+ return new InstagramRecentMediaCollector(this.mediaFeedQueue, this.config);
+ }
+
+
+ @Override
+ public StreamsResultSet readCurrent() {
+ Queue<StreamsDatum> batch = Queues.newConcurrentLinkedQueue();
+ MediaFeedData data = null;
+ while(!this.mediaFeedQueue.isEmpty()) {
+ data = ComponentUtils.pollWhileNotEmpty(this.mediaFeedQueue);
+ batch.add(new StreamsDatum(data, data.getId()));
+ }
+ this.isCompleted.set(batch.size() == 0 && this.mediaFeedQueue.isEmpty() && this.dataCollector.isCompleted());
+ return new StreamsResultSet(batch);
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return !this.isCompleted.get();
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ this.mediaFeedQueue = Queues.newConcurrentLinkedQueue();
+ this.isCompleted = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void cleanUp() {
+ this.executorService.shutdown();
+ try {
+ this.executorService.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } finally {
+ this.executorService = null;
+ }
+ }
+
+ /**
+ * Add default start and stop points if necessary.
+ */
+ private void updateUserInfoList() {
+ UsersInfo usersInfo = this.config.getUsersInfo();
+ if(usersInfo.getDefaultAfterDate() == null && usersInfo.getDefaultBeforeDate() == null) {
+ return;
+ }
+ DateTime defaultAfterDate = usersInfo.getDefaultAfterDate();
+ DateTime defaultBeforeDate = usersInfo.getDefaultBeforeDate();
+ for(User user : usersInfo.getUsers()) {
+ if(defaultAfterDate != null && user.getAfterDate() == null) {
+ user.setAfterDate(defaultAfterDate);
+ }
+ if(defaultBeforeDate != null && user.getBeforeDate() == null) {
+ user.setBeforeDate(defaultBeforeDate);
+ }
+ }
+ }
+
+ /**
+ * Overrides the client id in the configuration.
+ * @param clientId client id to use
+ */
+ public void setInstagramClientId(String clientId) {
+ this.config.setClientId(clientId);
+ }
+
+ /**
+ * Overrides authroized user tokens in the configuration.
+ * @param tokenStrings
+ */
+ public void setAuthorizedUserTokens(Collection<String> tokenStrings) {
+ ensureUsersInfo(this.config).setAuthorizedTokens(Sets.newHashSet(tokenStrings));
+ }
+
+ /**
+ * Overrides the default before date in the configuration
+ * @param beforeDate
+ */
+ public void setDefaultBeforeDate(DateTime beforeDate) {
+ ensureUsersInfo(this.config).setDefaultBeforeDate(beforeDate);
+ }
+
+ /**
+ * Overrides the default after date in the configuration
+ * @param afterDate
+ */
+ public void setDefaultAfterDate(DateTime afterDate) {
+ ensureUsersInfo(this.config).setDefaultAfterDate(afterDate);
+ }
+
+ /**
+ * Overrides the users in the configuration and sets the after date for each user. A NULL DateTime implies
+ * pull data from as early as possible. If default before or after DateTimes are set, they will applied to all
+ * NULL DateTimes.
+ * @param usersWithAfterDate instagram user id mapped to BeforeDate time
+ */
+ public void setUsersWithAfterDate(Map<String, DateTime> usersWithAfterDate) {
+ Set<User> users = Sets.newHashSet();
+ for(String userId : usersWithAfterDate.keySet()) {
+ User user = new User();
+ user.setUserId(userId);
+ user.setAfterDate(usersWithAfterDate.get(userId));
+ users.add(user);
+ }
+ ensureUsersInfo(this.config).setUsers(users);
+ }
+
+ private UsersInfo ensureUsersInfo(InstagramConfiguration config) {
+ UsersInfo usersInfo = config.getUsersInfo();
+ if(usersInfo == null) {
+ usersInfo = new UsersInfo();
+ config.setUsersInfo(usersInfo);
+ }
+ return usersInfo;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java
new file mode 100644
index 0000000..3bae50f
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java
@@ -0,0 +1,82 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+package org.apache.streams.instagram.provider.userinfo;
+
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.InstagramConfigurator;
+import org.apache.streams.util.SerializationUtil;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public class InstagramUserInfoProvider implements StreamsProvider {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InstagramUserInfoProvider.class);
+
+ private InstagramConfiguration config;
+ private AtomicBoolean isComplete;
+
+ public InstagramUserInfoProvider() {
+ this.config = InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram"));
+ }
+
+ public InstagramUserInfoProvider(InstagramConfiguration config) {
+ this.config = SerializationUtil.cloneBySerialization(config);
+ }
+
+ @Override
+ public void startStream() {
+
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return this.isComplete.get();
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ this.isComplete = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
deleted file mode 100644
index 1d234c9..0000000
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance *
-http://www.apache.org/licenses/LICENSE-2.0 *
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License. */
-package org.apache.streams.instagram.provider;
-
-import com.carrotsearch.randomizedtesting.RandomizedTest;
-import com.carrotsearch.randomizedtesting.annotations.Repeat;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
-import org.apache.streams.instagram.InstagramConfiguration;
-import org.apache.streams.instagram.InstagramUserInformationConfiguration;
-import org.apache.streams.instagram.User;
-import org.apache.streams.instagram.UsersInfo;
-import org.jinstagram.Instagram;
-import org.jinstagram.entity.common.Pagination;
-import org.jinstagram.entity.users.feed.MediaFeed;
-import org.jinstagram.entity.users.feed.MediaFeedData;
-import org.jinstagram.exceptions.InstagramBadRequestException;
-import org.jinstagram.exceptions.InstagramException;
-import org.jinstagram.exceptions.InstagramRateLimitException;
-import org.joda.time.DateTime;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.junit.Assert.*;
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for {@link org.apache.streams.instagram.provider.InstagramRecentMediaCollector}
- */
-public class InstagramRecentMediaCollectorTest extends RandomizedTest {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollectorTest.class);
-
- private int expectedDataCount = 0;
-
-
-
-
- @Test
- @Repeat(iterations = 3)
- public void testRun() {
- this.expectedDataCount = 0;
- Queue<MediaFeedData> data = Queues.newConcurrentLinkedQueue();
- InstagramConfiguration config = new InstagramConfiguration();
- UsersInfo usersInfo = new UsersInfo();
- config.setUsersInfo(usersInfo);
- Set<User> users = creatUsers(randomIntBetween(0, 100));
- usersInfo.setUsers(users);
-
- final Instagram mockInstagram = createMockInstagramClient();
- InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(data, config) {
- @Override
- protected Instagram getNextInstagramClient() {
- return mockInstagram;
- }
- };
- assertFalse(collector.isCompleted());
- collector.run();
- assertTrue(collector.isCompleted());
- assertEquals(this.expectedDataCount, data.size());
- }
-
- private Instagram createMockInstagramClient() {
- final Instagram instagramClient = mock(Instagram.class);
- try {
- final InstagramException mockException = mock(InstagramException.class);
- when(mockException.getRemainingLimitStatus()).thenReturn(-1);
- when(mockException.getMessage()).thenReturn("MockInstagramException message");
- when(instagramClient.getRecentMediaFeed(anyLong(), anyInt(), anyString(), anyString(), any(Date.class), any(Date.class))).thenAnswer(new Answer<MediaFeed>() {
- @Override
- public MediaFeed answer(InvocationOnMock invocationOnMock) throws Throwable {
- if (randomInt(20) == 0) { //5% throw exceptions
- int type = randomInt(4);
- if (type == 0)
- throw mock(InstagramRateLimitException.class);
- else if (type == 1)
- throw mock(InstagramBadRequestException.class);
- else if (type == 2)
- throw mock(InstagramException.class);
- else
- throw new Exception();
- } else {
- return createRandomMockMediaFeed();
- }
- }
- });
- when(instagramClient.getRecentMediaNextPage(any(Pagination.class))).thenAnswer(new Answer<MediaFeed>() {
- @Override
- public MediaFeed answer(InvocationOnMock invocationOnMock) throws Throwable {
- return createRandomMockMediaFeed();
- }
- });
- } catch (InstagramException ie) {
- fail("Failed to create mock instagram client.");
- }
- return instagramClient;
- }
-
- private Set<User> creatUsers(int numUsers) {
- Set<User> users = Sets.newHashSet();
- for(int i=0; i < numUsers; ++i) {
- User user = new User();
- user.setUserId(Integer.toString(randomInt()));
- if(randomInt(2) == 0) {
- user.setAfterDate(DateTime.now().minusSeconds(randomIntBetween(0, 1000)));
- }
- if(randomInt(2) == 0) {
- user.setBeforeDate(DateTime.now());
- }
- users.add(user);
- }
- return users;
- }
-
- private MediaFeed createRandomMockMediaFeed() throws InstagramException {
- MediaFeed feed = mock(MediaFeed.class);
- when(feed.getData()).thenReturn(createData(randomInt(100)));
- Pagination pagination = mock(Pagination.class);
- if(randomInt(2) == 0) {
- when(pagination.hasNextPage()).thenReturn(true);
- } else {
- when(pagination.hasNextPage()).thenReturn(false);
- }
- when(feed.getPagination()).thenReturn(pagination);
- return feed;
- }
-
- private List<MediaFeedData> createData(int size) {
- List<MediaFeedData> data = Lists.newLinkedList();
- for(int i=0; i < size; ++i) {
- data.add(mock(MediaFeedData.class));
- }
- this.expectedDataCount += size;
- return data;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
deleted file mode 100644
index 71e569b..0000000
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance *
-http://www.apache.org/licenses/LICENSE-2.0 *
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License. */
-package org.apache.streams.instagram.provider;
-
-import com.google.common.collect.Sets;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.instagram.InstagramConfiguration;
-import org.apache.streams.instagram.InstagramUserInformationConfiguration;
-import org.apache.streams.instagram.User;
-import org.apache.streams.instagram.UsersInfo;
-import org.jinstagram.InstagramConfig;
-import org.jinstagram.entity.users.feed.MediaFeedData;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashSet;
-import java.util.Random;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-/**
- *
- */
-public class InstagramRecentMediaProviderTest {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaProviderTest.class);
-
- @Test
- public void testStartStream() throws InterruptedException {
- final CountDownLatch latch = new CountDownLatch(1);
- final InstagramRecentMediaCollector collectorStub = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), createNonNullConfiguration()) {
-
- private volatile boolean isFinished = false;
-
- @Override
- public void run() {
- this.isFinished = true;
- latch.countDown();
- }
-
- @Override
- public boolean isCompleted() {
- return this.isFinished;
- }
- };
-
- InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(null) {
- @Override
- protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
- return collectorStub;
- }
- };
- provider.prepare(null);
- provider.startStream();
-
- latch.await();
- assertTrue(collectorStub.isCompleted());
- StreamsResultSet result = provider.readCurrent();
- assertNotNull(result);
- assertEquals(0, result.size());
- assertTrue(!provider.isRunning());
- try {
- provider.cleanUp();
- } catch (Throwable throwable){
- throwable.printStackTrace();
- fail("Error durring clean up");
- }
- }
-
- @Test
- public void testReadCurrent() {
- final long seed = System.nanoTime();
- final Random rand = new Random(seed);
- final CyclicBarrier test = new CyclicBarrier(2);
- final CyclicBarrier produce = new CyclicBarrier(2);
- final AtomicInteger batchCount = new AtomicInteger(0);
- InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(createNonNullConfiguration()) {
- @Override
- protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
- return new InstagramRecentMediaCollector(super.mediaFeedQueue, createNonNullConfiguration()) {
-
- private volatile boolean isFinished = false;
-
-
-
- public int getBatchCount() {
- return batchCount.get();
- }
-
- @Override
- public boolean isCompleted() {
- return isFinished;
- }
-
- @Override
- public void run() {
- int randInt = rand.nextInt(5);
- while(randInt != 0) {
- int batchSize = rand.nextInt(200);
- for(int i=0; i < batchSize; ++i) {
- while(!super.dataQueue.add(mock(MediaFeedData.class))) {
- Thread.yield();
- }
- }
- batchCount.set(batchSize);
- try {
- test.await();
- produce.await();
- } catch (InterruptedException ie ) {
- Thread.currentThread().interrupt();
- } catch (BrokenBarrierException bbe) {
- Thread.currentThread().interrupt();
- }
- randInt = rand.nextInt(5);
- }
- batchCount.set(0);
- isFinished = true;
- try {
- test.await();
- produce.await();
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- } catch (BrokenBarrierException bbe) {
- Thread.currentThread().interrupt();
- }
- }
-
- };
- }
- };
- provider.prepare(null);
- provider.startStream();
- while(provider.isRunning()) {
- try {
- test.await();
- assertEquals("Seed == "+seed, batchCount.get(), provider.readCurrent().size());
- produce.await();
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- } catch (BrokenBarrierException bbe) {
- Thread.currentThread().interrupt();
- }
-
- }
- }
-
- private InstagramConfiguration createNonNullConfiguration() {
- InstagramConfiguration configuration = new InstagramConfiguration();
- UsersInfo info = new UsersInfo();
- configuration.setUsersInfo(info);
- info.setUsers(new HashSet<User>());
- info.setAuthorizedTokens(new HashSet<String>());
- return configuration;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java
new file mode 100644
index 0000000..0020652
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java
@@ -0,0 +1,155 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+package org.apache.streams.instagram.provider.recentmedia;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.User;
+import org.apache.streams.instagram.UsersInfo;
+import org.jinstagram.Instagram;
+import org.jinstagram.entity.common.Pagination;
+import org.jinstagram.entity.users.feed.MediaFeed;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.jinstagram.exceptions.InstagramBadRequestException;
+import org.jinstagram.exceptions.InstagramException;
+import org.jinstagram.exceptions.InstagramRateLimitException;
+import org.joda.time.DateTime;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link org.apache.streams.instagram.provider.recentmedia.InstagramRecentMediaCollector}
+ */
+public class InstagramRecentMediaCollectorTest extends RandomizedTest {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollectorTest.class);
+
+ private int expectedDataCount = 0;
+
+
+
+
+ @Test
+ @Repeat(iterations = 3)
+ public void testRun() {
+ this.expectedDataCount = 0;
+ Queue<MediaFeedData> data = Queues.newConcurrentLinkedQueue();
+ InstagramConfiguration config = new InstagramConfiguration();
+ UsersInfo usersInfo = new UsersInfo();
+ config.setUsersInfo(usersInfo);
+ Set<User> users = creatUsers(randomIntBetween(0, 100));
+ usersInfo.setUsers(users);
+
+ final Instagram mockInstagram = createMockInstagramClient();
+ InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(data, config) {
+ @Override
+ protected Instagram getNextInstagramClient() {
+ return mockInstagram;
+ }
+ };
+ assertFalse(collector.isCompleted());
+ collector.run();
+ assertTrue(collector.isCompleted());
+ assertEquals(this.expectedDataCount, data.size());
+ }
+
+ private Instagram createMockInstagramClient() {
+ final Instagram instagramClient = mock(Instagram.class);
+ try {
+ final InstagramException mockException = mock(InstagramException.class);
+ when(mockException.getRemainingLimitStatus()).thenReturn(-1);
+ when(mockException.getMessage()).thenReturn("MockInstagramException message");
+ when(instagramClient.getRecentMediaFeed(anyLong(), anyInt(), anyString(), anyString(), any(Date.class), any(Date.class))).thenAnswer(new Answer<MediaFeed>() {
+ @Override
+ public MediaFeed answer(InvocationOnMock invocationOnMock) throws Throwable {
+ if (randomInt(20) == 0) { //5% throw exceptions
+ int type = randomInt(4);
+ if (type == 0)
+ throw mock(InstagramRateLimitException.class);
+ else if (type == 1)
+ throw mock(InstagramBadRequestException.class);
+ else if (type == 2)
+ throw mock(InstagramException.class);
+ else
+ throw new Exception();
+ } else {
+ return createRandomMockMediaFeed();
+ }
+ }
+ });
+ when(instagramClient.getRecentMediaNextPage(any(Pagination.class))).thenAnswer(new Answer<MediaFeed>() {
+ @Override
+ public MediaFeed answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return createRandomMockMediaFeed();
+ }
+ });
+ } catch (InstagramException ie) {
+ fail("Failed to create mock instagram client.");
+ }
+ return instagramClient;
+ }
+
+ private Set<User> creatUsers(int numUsers) {
+ Set<User> users = Sets.newHashSet();
+ for(int i=0; i < numUsers; ++i) {
+ User user = new User();
+ user.setUserId(Integer.toString(randomInt()));
+ if(randomInt(2) == 0) {
+ user.setAfterDate(DateTime.now().minusSeconds(randomIntBetween(0, 1000)));
+ }
+ if(randomInt(2) == 0) {
+ user.setBeforeDate(DateTime.now());
+ }
+ users.add(user);
+ }
+ return users;
+ }
+
+ private MediaFeed createRandomMockMediaFeed() throws InstagramException {
+ MediaFeed feed = mock(MediaFeed.class);
+ when(feed.getData()).thenReturn(createData(randomInt(100)));
+ Pagination pagination = mock(Pagination.class);
+ if(randomInt(2) == 0) {
+ when(pagination.hasNextPage()).thenReturn(true);
+ } else {
+ when(pagination.hasNextPage()).thenReturn(false);
+ }
+ when(feed.getPagination()).thenReturn(pagination);
+ return feed;
+ }
+
+ private List<MediaFeedData> createData(int size) {
+ List<MediaFeedData> data = Lists.newLinkedList();
+ for(int i=0; i < size; ++i) {
+ data.add(mock(MediaFeedData.class));
+ }
+ this.expectedDataCount += size;
+ return data;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java
new file mode 100644
index 0000000..4a6396b
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java
@@ -0,0 +1,172 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+package org.apache.streams.instagram.provider.recentmedia;
+
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.User;
+import org.apache.streams.instagram.UsersInfo;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+/**
+ *
+ */
+public class InstagramRecentMediaProviderTest {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaProviderTest.class);
+
+ @Test
+ public void testStartStream() throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final InstagramRecentMediaCollector collectorStub = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), createNonNullConfiguration()) {
+
+ private volatile boolean isFinished = false;
+
+ @Override
+ public void run() {
+ this.isFinished = true;
+ latch.countDown();
+ }
+
+ @Override
+ public boolean isCompleted() {
+ return this.isFinished;
+ }
+ };
+
+ InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(null) {
+ @Override
+ protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
+ return collectorStub;
+ }
+ };
+ provider.prepare(null);
+ provider.startStream();
+
+ latch.await();
+ assertTrue(collectorStub.isCompleted());
+ StreamsResultSet result = provider.readCurrent();
+ assertNotNull(result);
+ assertEquals(0, result.size());
+ assertTrue(!provider.isRunning());
+ try {
+ provider.cleanUp();
+ } catch (Throwable throwable){
+ throwable.printStackTrace();
+ fail("Error durring clean up");
+ }
+ }
+
+ @Test
+ public void testReadCurrent() {
+ final long seed = System.nanoTime();
+ final Random rand = new Random(seed);
+ final CyclicBarrier test = new CyclicBarrier(2);
+ final CyclicBarrier produce = new CyclicBarrier(2);
+ final AtomicInteger batchCount = new AtomicInteger(0);
+ InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(createNonNullConfiguration()) {
+ @Override
+ protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
+ return new InstagramRecentMediaCollector(super.mediaFeedQueue, createNonNullConfiguration()) {
+
+ private volatile boolean isFinished = false;
+
+
+
+ public int getBatchCount() {
+ return batchCount.get();
+ }
+
+ @Override
+ public boolean isCompleted() {
+ return isFinished;
+ }
+
+ @Override
+ public void run() {
+ int randInt = rand.nextInt(5);
+ while(randInt != 0) {
+ int batchSize = rand.nextInt(200);
+ for(int i=0; i < batchSize; ++i) {
+ while(!super.dataQueue.add(mock(MediaFeedData.class))) {
+ Thread.yield();
+ }
+ }
+ batchCount.set(batchSize);
+ try {
+ test.await();
+ produce.await();
+ } catch (InterruptedException ie ) {
+ Thread.currentThread().interrupt();
+ } catch (BrokenBarrierException bbe) {
+ Thread.currentThread().interrupt();
+ }
+ randInt = rand.nextInt(5);
+ }
+ batchCount.set(0);
+ isFinished = true;
+ try {
+ test.await();
+ produce.await();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } catch (BrokenBarrierException bbe) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ };
+ }
+ };
+ provider.prepare(null);
+ provider.startStream();
+ while(provider.isRunning()) {
+ try {
+ test.await();
+ assertEquals("Seed == "+seed, batchCount.get(), provider.readCurrent().size());
+ produce.await();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } catch (BrokenBarrierException bbe) {
+ Thread.currentThread().interrupt();
+ }
+
+ }
+ }
+
+ private InstagramConfiguration createNonNullConfiguration() {
+ InstagramConfiguration configuration = new InstagramConfiguration();
+ UsersInfo info = new UsersInfo();
+ configuration.setUsersInfo(info);
+ info.setUsers(new HashSet<User>());
+ info.setAuthorizedTokens(new HashSet<String>());
+ return configuration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java b/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
index 0975ea4..bc3583e 100644
--- a/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
+++ b/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
@@ -62,7 +62,7 @@ public class SerializationUtil {
}
- public static Object cloneBySerialization(Object obj) {
- return deserialize(serialize(obj));
+ public static <T> T cloneBySerialization(T obj) {
+ return (T) deserialize(serialize(obj));
}
}
|