streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject [16/41] git commit: STREAMS-145 | Created abstarct collectors/providers to instagram
Date Thu, 04 Sep 2014 17:12:04 GMT
STREAMS-145 | Created abstarct collectors/providers to instagram


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0df06b60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0df06b60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0df06b60

Branch: refs/heads/pp
Commit: 0df06b6095b26531d58c0f12b9eaa0abe5ee54b5
Parents: a01b691
Author: Ryan Ebanks <rebanks@Informations-MacBook-Pro-3.local>
Authored: Thu Aug 14 11:21:51 2014 -0500
Committer: Ryan Ebanks <rebanks@Informations-MacBook-Pro-3.local>
Committed: Thu Aug 14 11:21:51 2014 -0500

----------------------------------------------------------------------
 .../provider/InstagramAbstractProvider.java     | 195 ++++++++++++++++++
 .../provider/InstagramDataCollector.java        | 129 ++++++++++++
 .../provider/InstagramRecentMediaCollector.java | 171 ----------------
 .../provider/InstagramRecentMediaProvider.java  | 201 -------------------
 .../InstagramRecentMediaCollector.java          | 172 ++++++++++++++++
 .../InstagramRecentMediaProvider.java           | 201 +++++++++++++++++++
 .../userinfo/InstagramUserInfoProvider.java     |  82 ++++++++
 .../InstagramRecentMediaCollectorTest.java      | 159 ---------------
 .../InstagramRecentMediaProviderTest.java       | 175 ----------------
 .../InstagramRecentMediaCollectorTest.java      | 155 ++++++++++++++
 .../InstagramRecentMediaProviderTest.java       | 172 ++++++++++++++++
 .../apache/streams/util/SerializationUtil.java  |   4 +-
 12 files changed, 1108 insertions(+), 708 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
new file mode 100644
index 0000000..3d35714
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
@@ -0,0 +1,195 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+package org.apache.streams.instagram.provider;
+
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.InstagramConfigurator;
+import org.apache.streams.instagram.User;
+import org.apache.streams.instagram.UsersInfo;
+import org.apache.streams.util.ComponentUtils;
+import org.apache.streams.util.SerializationUtil;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Instagram {@link org.apache.streams.core.StreamsProvider} that provides the recent media data for a group of users
+ */
+public abstract class InstagramAbstractProvider<T> implements StreamsProvider {
+
+    private InstagramConfiguration config;
+    private InstagramDataCollector dataCollector;
+    protected Queue<StreamsDatum> dataQueue; //exposed for testing
+    private ExecutorService executorService;
+    private AtomicBoolean isCompleted;
+
+    public InstagramAbstractProvider() {
+        this.config = InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram"));
+    }
+
+    public InstagramAbstractProvider(InstagramConfiguration config) {
+        this.config = (InstagramConfiguration) SerializationUtil.cloneBySerialization(config);
+    }
+
+    @Override
+    public void startStream() {
+        this.dataCollector = getInstagramDataCollector();
+        this.executorService = Executors.newSingleThreadExecutor();
+        this.executorService.submit(this.dataCollector);
+    }
+
+    /**
+     * Return the data collector to use to connect to instagram.
+     * @return
+     */
+    protected abstract InstagramDataCollector getInstagramDataCollector();
+
+
+    @Override
+    public StreamsResultSet readCurrent() {
+        Queue<StreamsDatum> batch = Queues.newConcurrentLinkedQueue();
+        while(!this.dataQueue.isEmpty()) {
+            ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.dataQueue), batch);
+        }
+        this.isCompleted.set(batch.size() == 0 && this.dataQueue.isEmpty() && this.dataCollector.isCompleted());
+        return new StreamsResultSet(batch);
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return !this.isCompleted.get();
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        this.dataQueue = Queues.newConcurrentLinkedQueue();
+        this.isCompleted = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void cleanUp() {
+        this.executorService.shutdown();
+        try {
+            this.executorService.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        } finally {
+            this.executorService = null;
+        }
+    }
+
+    /**
+     * Add default start and stop points if necessary.
+     */
+    private void updateUserInfoList() {
+        UsersInfo usersInfo = this.config.getUsersInfo();
+        if(usersInfo.getDefaultAfterDate() == null && usersInfo.getDefaultBeforeDate() == null) {
+            return;
+        }
+        DateTime defaultAfterDate = usersInfo.getDefaultAfterDate();
+        DateTime defaultBeforeDate = usersInfo.getDefaultBeforeDate();
+        for(User user : usersInfo.getUsers()) {
+            if(defaultAfterDate != null && user.getAfterDate() == null) {
+                user.setAfterDate(defaultAfterDate);
+            }
+            if(defaultBeforeDate != null && user.getBeforeDate() == null) {
+                user.setBeforeDate(defaultBeforeDate);
+            }
+        }
+    }
+
+    /**
+     * Overrides the client id in the configuration.
+     * @param clientId client id to use
+     */
+    public void setInstagramClientId(String clientId) {
+        this.config.setClientId(clientId);
+    }
+
+    /**
+     * Overrides authroized user tokens in the configuration.
+     * @param tokenStrings
+     */
+    public void setAuthorizedUserTokens(Collection<String> tokenStrings) {
+        ensureUsersInfo(this.config).setAuthorizedTokens(Sets.newHashSet(tokenStrings));
+    }
+
+    /**
+     * Overrides the default before date in the configuration
+     * @param beforeDate
+     */
+    public void setDefaultBeforeDate(DateTime beforeDate) {
+        ensureUsersInfo(this.config).setDefaultBeforeDate(beforeDate);
+    }
+
+    /**
+     * Overrides the default after date in the configuration
+     * @param afterDate
+     */
+    public void setDefaultAfterDate(DateTime afterDate) {
+        ensureUsersInfo(this.config).setDefaultAfterDate(afterDate);
+    }
+
+    /**
+     * Overrides the users in the configuration and sets the after date for each user. A NULL DateTime implies
+     * pull data from as early as possible.  If default before or after DateTimes are set, they will applied to all
+     * NULL DateTimes.
+     * @param usersWithAfterDate instagram user id mapped to BeforeDate time
+     */
+    public void setUsersWithAfterDate(Map<String, DateTime> usersWithAfterDate) {
+        Set<User> users = Sets.newHashSet();
+        for(String userId : usersWithAfterDate.keySet()) {
+            User user = new User();
+            user.setUserId(userId);
+            user.setAfterDate(usersWithAfterDate.get(userId));
+            users.add(user);
+        }
+        ensureUsersInfo(this.config).setUsers(users);
+    }
+
+    private UsersInfo ensureUsersInfo(InstagramConfiguration config) {
+        UsersInfo usersInfo = config.getUsersInfo();
+        if(usersInfo == null) {
+            usersInfo = new UsersInfo();
+            config.setUsersInfo(usersInfo);
+        }
+        return usersInfo;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java
new file mode 100644
index 0000000..4cfc282
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java
@@ -0,0 +1,129 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+package org.apache.streams.instagram.provider;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.User;
+import org.apache.streams.util.ComponentUtils;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
+import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
+import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger;
+import org.jinstagram.Instagram;
+import org.jinstagram.entity.common.Pagination;
+import org.jinstagram.entity.users.feed.MediaFeed;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.jinstagram.exceptions.InstagramBadRequestException;
+import org.jinstagram.exceptions.InstagramRateLimitException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Executes on all of the Instagram requests to collect the media feed data.
+ * <p/>
+ * If errors/exceptions occur when trying to gather data for a particular user, that user is skipped and the collector
+ * move on to the next user.  If a rate limit exception occurs it employs an exponential back off strategy.
+ */
+public abstract class InstagramDataCollector<T> implements Runnable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(InstagramDataCollector.class);
+
+    protected Queue<StreamsDatum> dataQueue; //exposed for testing
+    private InstagramConfiguration config;
+    private AtomicBoolean isCompleted;
+    private SimpleTokenManager<InstagramOauthToken> tokenManger;
+    protected int consecutiveErrorCount;
+    protected BackOffStrategy backOffStrategy;
+    private Instagram instagram;
+
+
+    public InstagramDataCollector(Queue<StreamsDatum> queue, InstagramConfiguration config) {
+        this.dataQueue = queue;
+        this.config = config;
+        this.isCompleted = new AtomicBoolean(false);
+        this.tokenManger = new BasicTokenManger<InstagramOauthToken>();
+        for (String tokens : this.config.getUsersInfo().getAuthorizedTokens()) {
+            this.tokenManger.addTokenToPool(new InstagramOauthToken(tokens));
+        }
+        this.consecutiveErrorCount = 0;
+        this.backOffStrategy = new ExponentialBackOffStrategy(2);
+        this.instagram = new Instagram(this.config.getClientId());
+    }
+
+
+    /**
+     * If there are authorized tokens available, it sets a new token for the client and returns
+     * the client.  If there are no available tokens, it simply returns the client that was
+     * initialized in the constructor with client id.
+     * @return
+     */
+    protected Instagram getNextInstagramClient() {
+        if(this.tokenManger.numAvailableTokens() > 0) {
+            this.instagram.setAccessToken(this.tokenManger.getNextAvailableToken());
+        }
+        return this.instagram;
+    }
+
+    protected void queueData(Collection<T> userData, String userId) {
+        if (userData == null) {
+            LOGGER.warn("User id, {}, returned a NULL data from instagram.", userId);
+        } else {
+            for (T data : userData) {
+                ComponentUtils.offerUntilSuccess(convertToStreamsDatum(data), this.dataQueue);
+            }
+        }
+    }
+
+    /**
+     * @return true when the collector has queued all of the available media feed data for the provided users.
+     */
+    public boolean isCompleted() {
+        return this.isCompleted.get();
+    }
+
+    @Override
+    public void run() {
+        try {
+            for (User user : this.config.getUsersInfo().getUsers()) {
+                collectInstagramDataForUser(user);
+            }
+        } catch (Exception e) {
+            LOGGER.error("Shutting down InstagramCollector. Exception occured: {}", e.getMessage());
+        }
+        this.isCompleted.set(true);
+    }
+
+    /**
+     * Pull instagram data for a user and queues the resulting data.
+     * @param user
+     * @throws Exception
+     */
+    protected abstract void collectInstagramDataForUser(User user);
+
+    /**
+     * Takes an Instagram Object and sets it as the document of a streams datum and sets the id of the streams datum.
+     * @param item
+     * @return
+     */
+    protected abstract StreamsDatum convertToStreamsDatum(T item);
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
deleted file mode 100644
index 8dc7a82..0000000
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance *
-http://www.apache.org/licenses/LICENSE-2.0 *
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License. */
-package org.apache.streams.instagram.provider;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.streams.instagram.InstagramConfiguration;
-import org.apache.streams.instagram.User;
-import org.apache.streams.util.ComponentUtils;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
-import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
-import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger;
-import org.jinstagram.Instagram;
-import org.jinstagram.entity.common.Pagination;
-import org.jinstagram.entity.users.feed.MediaFeed;
-import org.jinstagram.entity.users.feed.MediaFeedData;
-import org.jinstagram.exceptions.InstagramBadRequestException;
-import org.jinstagram.exceptions.InstagramRateLimitException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Executes on all of the Instagram requests to collect the media feed data.
- * <p/>
- * If errors/exceptions occur when trying to gather data for a particular user, that user is skipped and the collector
- * move on to the next user.  If a rate limit exception occurs it employs an exponential back off strategy.
- */
-public class InstagramRecentMediaCollector implements Runnable {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollector.class);
-    protected static final int MAX_ATTEMPTS = 5;
-    protected static final int SLEEP_SECS = 5; //5 seconds
-
-    protected Queue<MediaFeedData> dataQueue; //exposed for testing
-    private InstagramConfiguration config;
-    private AtomicBoolean isCompleted;
-    private SimpleTokenManager<InstagramOauthToken> tokenManger;
-    private int consecutiveErrorCount;
-    private BackOffStrategy backOffStrategy;
-    private Instagram instagram;
-
-
-    public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramConfiguration config) {
-        this.dataQueue = queue;
-        this.config = config;
-        this.isCompleted = new AtomicBoolean(false);
-        this.tokenManger = new BasicTokenManger<InstagramOauthToken>();
-        for (String tokens : this.config.getUsersInfo().getAuthorizedTokens()) {
-            this.tokenManger.addTokenToPool(new InstagramOauthToken(tokens));
-        }
-        this.consecutiveErrorCount = 0;
-        this.backOffStrategy = new ExponentialBackOffStrategy(2);
-        this.instagram = new Instagram(this.config.getClientId());
-    }
-
-
-    /**
-     * If there are authorized tokens available, it sets a new token for the client and returns
-     * the client.  If there are no available tokens, it simply returns the client that was
-     * initialized in the constructor with client id.
-     * @return
-     */
-    @VisibleForTesting
-    protected Instagram getNextInstagramClient() {
-        if(this.tokenManger.numAvailableTokens() > 0) {
-            this.instagram.setAccessToken(this.tokenManger.getNextAvailableToken());
-        }
-        return this.instagram;
-    }
-
-    private void queueData(MediaFeed userFeed, String userId) {
-        if (userFeed == null) {
-            LOGGER.warn("User id, {}, returned a NULL media feed from instagram.", userId);
-        } else {
-            for (MediaFeedData data : userFeed.getData()) {
-                ComponentUtils.offerUntilSuccess(data, this.dataQueue);
-            }
-        }
-    }
-
-    /**
-     * @return true when the collector has queued all of the available media feed data for the provided users.
-     */
-    public boolean isCompleted() {
-        return this.isCompleted.get();
-    }
-
-    @Override
-    public void run() {
-        try {
-            for (User user : this.config.getUsersInfo().getUsers()) {
-                collectMediaFeed(user);
-            }
-        } catch (Exception e) {
-            LOGGER.error("Shutting down InstagramCollector. Exception occured: {}", e.getMessage());
-        }
-        this.isCompleted.set(true);
-    }
-
-    /**
-     * Pull Recement Media for a user and queues the resulting data. Will try a single call 5 times before failing and
-     * moving on to the next call or returning.
-     * @param user
-     * @throws Exception
-     */
-    @VisibleForTesting
-    protected void collectMediaFeed(User user) throws Exception {
-        Pagination pagination = null;
-        do {
-            int attempts = 0;
-            boolean succesfullDataPull = false;
-            while (!succesfullDataPull && attempts < MAX_ATTEMPTS) {
-                ++attempts;
-                MediaFeed feed = null;
-                try {
-                    if (pagination == null) {
-                        feed = getNextInstagramClient().getRecentMediaFeed(Long.valueOf(user.getUserId()),
-                                0,
-                                null,
-                                null,
-                                user.getBeforeDate() == null ? null : user.getBeforeDate().toDate(),
-                                user.getAfterDate() == null ? null : user.getAfterDate().toDate());
-                    } else {
-                        feed = getNextInstagramClient().getRecentMediaNextPage(pagination);
-                    }
-                } catch (Exception e) {
-                    if(e instanceof InstagramRateLimitException) {
-                        LOGGER.warn("Received rate limit exception from Instagram, backing off. : {}", e);
-                        this.backOffStrategy.backOff();
-                    } else if(e instanceof InstagramBadRequestException) {
-                        LOGGER.error("Received Bad Requests exception form Instagram: {}", e);
-                        attempts = MAX_ATTEMPTS; //don't repeat bad requests.
-                        ++this.consecutiveErrorCount;
-                    } else {
-                        LOGGER.error("Received Expection while attempting to poll Instagram: {}", e);
-                        ++this.consecutiveErrorCount;
-                    }
-                    if(this.consecutiveErrorCount > Math.max(this.tokenManger.numAvailableTokens(), MAX_ATTEMPTS*2)) {
-                        throw new Exception("InstagramCollector failed to successfully connect to instagram on "+this.consecutiveErrorCount+" attempts.");
-                    }
-                }
-                if(succesfullDataPull = feed != null) {
-                    this.consecutiveErrorCount = 0;
-                    this.backOffStrategy.reset();
-                    pagination = feed.getPagination();
-                    queueData(feed, user.getUserId());
-                }
-            }
-            if(!succesfullDataPull) {
-                LOGGER.error("Failed to get data from instagram for user id, {}, skipping user.", user.getUserId());
-            }
-        } while (pagination != null && pagination.hasNextPage());
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
deleted file mode 100644
index 874fd19..0000000
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance *
-http://www.apache.org/licenses/LICENSE-2.0 *
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License. */
-package org.apache.streams.instagram.provider;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.instagram.*;
-import org.apache.streams.util.ComponentUtils;
-import org.apache.streams.util.SerializationUtil;
-import org.jinstagram.auth.model.Token;
-import org.jinstagram.entity.users.feed.MediaFeedData;
-import org.joda.time.DateTime;
-
-import java.math.BigInteger;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Instagram {@link org.apache.streams.core.StreamsProvider} that provides the recent media data for a group of users
- */
-public class InstagramRecentMediaProvider implements StreamsProvider {
-
-    private InstagramConfiguration config;
-    private InstagramRecentMediaCollector dataCollector;
-    protected Queue<MediaFeedData> mediaFeedQueue; //exposed for testing
-    private ExecutorService executorService;
-    private AtomicBoolean isCompleted;
-
-    public InstagramRecentMediaProvider() {
-        this.config = InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram"));
-    }
-
-    public InstagramRecentMediaProvider(InstagramConfiguration config) {
-        this.config = (InstagramConfiguration) SerializationUtil.cloneBySerialization(config);
-    }
-
-    @Override
-    public void startStream() {
-        this.dataCollector = getInstagramRecentMediaCollector();
-        this.executorService = Executors.newSingleThreadExecutor();
-        this.executorService.submit(this.dataCollector);
-    }
-
-    /**
-     * EXPOSED FOR TESTING
-     * @return
-     */
-    @VisibleForTesting
-    protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
-        this.updateUserInfoList();
-        return new InstagramRecentMediaCollector(this.mediaFeedQueue, this.config);
-    }
-
-
-    @Override
-    public StreamsResultSet readCurrent() {
-        Queue<StreamsDatum> batch = Queues.newConcurrentLinkedQueue();
-        MediaFeedData data = null;
-        while(!this.mediaFeedQueue.isEmpty()) {
-            data = ComponentUtils.pollWhileNotEmpty(this.mediaFeedQueue);
-            batch.add(new StreamsDatum(data, data.getId()));
-        }
-        this.isCompleted.set(batch.size() == 0 && this.mediaFeedQueue.isEmpty() && this.dataCollector.isCompleted());
-        return new StreamsResultSet(batch);
-    }
-
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
-    }
-
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return !this.isCompleted.get();
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        this.mediaFeedQueue = Queues.newConcurrentLinkedQueue();
-        this.isCompleted = new AtomicBoolean(false);
-    }
-
-    @Override
-    public void cleanUp() {
-        this.executorService.shutdown();
-        try {
-            this.executorService.awaitTermination(5, TimeUnit.SECONDS);
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-        } finally {
-            this.executorService = null;
-        }
-    }
-
-    /**
-     * Add default start and stop points if necessary.
-     */
-    private void updateUserInfoList() {
-        UsersInfo usersInfo = this.config.getUsersInfo();
-        if(usersInfo.getDefaultAfterDate() == null && usersInfo.getDefaultBeforeDate() == null) {
-            return;
-        }
-        DateTime defaultAfterDate = usersInfo.getDefaultAfterDate();
-        DateTime defaultBeforeDate = usersInfo.getDefaultBeforeDate();
-        for(User user : usersInfo.getUsers()) {
-            if(defaultAfterDate != null && user.getAfterDate() == null) {
-                user.setAfterDate(defaultAfterDate);
-            }
-            if(defaultBeforeDate != null && user.getBeforeDate() == null) {
-                user.setBeforeDate(defaultBeforeDate);
-            }
-        }
-    }
-
-    /**
-     * Overrides the client id in the configuration.
-     * @param clientId client id to use
-     */
-    public void setInstagramClientId(String clientId) {
-        this.config.setClientId(clientId);
-    }
-
-    /**
-     * Overrides authroized user tokens in the configuration.
-     * @param tokenStrings
-     */
-    public void setAuthorizedUserTokens(Collection<String> tokenStrings) {
-        ensureUsersInfo(this.config).setAuthorizedTokens(Sets.newHashSet(tokenStrings));
-    }
-
-    /**
-     * Overrides the default before date in the configuration
-     * @param beforeDate
-     */
-    public void setDefaultBeforeDate(DateTime beforeDate) {
-        ensureUsersInfo(this.config).setDefaultBeforeDate(beforeDate);
-    }
-
-    /**
-     * Overrides the default after date in the configuration
-     * @param afterDate
-     */
-    public void setDefaultAfterDate(DateTime afterDate) {
-        ensureUsersInfo(this.config).setDefaultAfterDate(afterDate);
-    }
-
-    /**
-     * Overrides the users in the configuration and sets the after date for each user. A NULL DateTime implies
-     * pull data from as early as possible.  If default before or after DateTimes are set, they will applied to all
-     * NULL DateTimes.
-     * @param usersWithAfterDate instagram user id mapped to BeforeDate time
-     */
-    public void setUsersWithAfterDate(Map<String, DateTime> usersWithAfterDate) {
-        Set<User> users = Sets.newHashSet();
-        for(String userId : usersWithAfterDate.keySet()) {
-            User user = new User();
-            user.setUserId(userId);
-            user.setAfterDate(usersWithAfterDate.get(userId));
-            users.add(user);
-        }
-        ensureUsersInfo(this.config).setUsers(users);
-    }
-
-    private UsersInfo ensureUsersInfo(InstagramConfiguration config) {
-        UsersInfo usersInfo = config.getUsersInfo();
-        if(usersInfo == null) {
-            usersInfo = new UsersInfo();
-            config.setUsersInfo(usersInfo);
-        }
-        return usersInfo;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
new file mode 100644
index 0000000..952e3bf
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
@@ -0,0 +1,172 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+package org.apache.streams.instagram.provider.recentmedia;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.User;
+import org.apache.streams.instagram.provider.InstagramOauthToken;
+import org.apache.streams.util.ComponentUtils;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
+import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
+import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger;
+import org.jinstagram.Instagram;
+import org.jinstagram.entity.common.Pagination;
+import org.jinstagram.entity.users.feed.MediaFeed;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.jinstagram.exceptions.InstagramBadRequestException;
+import org.jinstagram.exceptions.InstagramRateLimitException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Executes on all of the Instagram requests to collect the media feed data.
+ * <p/>
+ * If errors/exceptions occur when trying to gather data for a particular user, that user is skipped and the collector
+ * move on to the next user.  If a rate limit exception occurs it employs an exponential back off strategy.
+ */
+public class InstagramRecentMediaCollector implements Runnable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollector.class);
+    protected static final int MAX_ATTEMPTS = 5;
+    protected static final int SLEEP_SECS = 5; //5 seconds
+
+    protected Queue<MediaFeedData> dataQueue; //exposed for testing
+    private InstagramConfiguration config;
+    private AtomicBoolean isCompleted;
+    private SimpleTokenManager<InstagramOauthToken> tokenManger;
+    private int consecutiveErrorCount;
+    private BackOffStrategy backOffStrategy;
+    private Instagram instagram;
+
+
+    public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramConfiguration config) {
+        this.dataQueue = queue;
+        this.config = config;
+        this.isCompleted = new AtomicBoolean(false);
+        this.tokenManger = new BasicTokenManger<InstagramOauthToken>();
+        for (String tokens : this.config.getUsersInfo().getAuthorizedTokens()) {
+            this.tokenManger.addTokenToPool(new InstagramOauthToken(tokens));
+        }
+        this.consecutiveErrorCount = 0;
+        this.backOffStrategy = new ExponentialBackOffStrategy(2);
+        this.instagram = new Instagram(this.config.getClientId());
+    }
+
+
+    /**
+     * If there are authorized tokens available, it sets a new token for the client and returns
+     * the client.  If there are no available tokens, it simply returns the client that was
+     * initialized in the constructor with client id.
+     * @return
+     */
+    @VisibleForTesting
+    protected Instagram getNextInstagramClient() {
+        if(this.tokenManger.numAvailableTokens() > 0) {
+            this.instagram.setAccessToken(this.tokenManger.getNextAvailableToken());
+        }
+        return this.instagram;
+    }
+
+    private void queueData(MediaFeed userFeed, String userId) {
+        if (userFeed == null) {
+            LOGGER.warn("User id, {}, returned a NULL media feed from instagram.", userId);
+        } else {
+            for (MediaFeedData data : userFeed.getData()) {
+                ComponentUtils.offerUntilSuccess(data, this.dataQueue);
+            }
+        }
+    }
+
+    /**
+     * @return true when the collector has queued all of the available media feed data for the provided users.
+     */
+    public boolean isCompleted() {
+        return this.isCompleted.get();
+    }
+
+    @Override
+    public void run() {
+        try {
+            for (User user : this.config.getUsersInfo().getUsers()) {
+                collectMediaFeed(user);
+            }
+        } catch (Exception e) {
+            LOGGER.error("Shutting down InstagramCollector. Exception occured: {}", e.getMessage());
+        }
+        this.isCompleted.set(true);
+    }
+
+    /**
+     * Pull Recement Media for a user and queues the resulting data. Will try a single call 5 times before failing and
+     * moving on to the next call or returning.
+     * @param user
+     * @throws Exception
+     */
+    @VisibleForTesting
+    protected void collectMediaFeed(User user) throws Exception {
+        Pagination pagination = null;
+        do {
+            int attempts = 0;
+            boolean succesfullDataPull = false;
+            while (!succesfullDataPull && attempts < MAX_ATTEMPTS) {
+                ++attempts;
+                MediaFeed feed = null;
+                try {
+                    if (pagination == null) {
+                        feed = getNextInstagramClient().getRecentMediaFeed(Long.valueOf(user.getUserId()),
+                                0,
+                                null,
+                                null,
+                                user.getBeforeDate() == null ? null : user.getBeforeDate().toDate(),
+                                user.getAfterDate() == null ? null : user.getAfterDate().toDate());
+                    } else {
+                        feed = getNextInstagramClient().getRecentMediaNextPage(pagination);
+                    }
+                } catch (Exception e) {
+                    if(e instanceof InstagramRateLimitException) {
+                        LOGGER.warn("Received rate limit exception from Instagram, backing off. : {}", e);
+                        this.backOffStrategy.backOff();
+                    } else if(e instanceof InstagramBadRequestException) {
+                        LOGGER.error("Received Bad Requests exception form Instagram: {}", e);
+                        attempts = MAX_ATTEMPTS; //don't repeat bad requests.
+                        ++this.consecutiveErrorCount;
+                    } else {
+                        LOGGER.error("Received Expection while attempting to poll Instagram: {}", e);
+                        ++this.consecutiveErrorCount;
+                    }
+                    if(this.consecutiveErrorCount > Math.max(this.tokenManger.numAvailableTokens(), MAX_ATTEMPTS*2)) {
+                        throw new Exception("InstagramCollector failed to successfully connect to instagram on "+this.consecutiveErrorCount+" attempts.");
+                    }
+                }
+                if(succesfullDataPull = feed != null) {
+                    this.consecutiveErrorCount = 0;
+                    this.backOffStrategy.reset();
+                    pagination = feed.getPagination();
+                    queueData(feed, user.getUserId());
+                }
+            }
+            if(!succesfullDataPull) {
+                LOGGER.error("Failed to get data from instagram for user id, {}, skipping user.", user.getUserId());
+            }
+        } while (pagination != null && pagination.hasNextPage());
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
new file mode 100644
index 0000000..39acbd4
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
@@ -0,0 +1,201 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+package org.apache.streams.instagram.provider.recentmedia;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.instagram.*;
+import org.apache.streams.instagram.provider.recentmedia.InstagramRecentMediaCollector;
+import org.apache.streams.util.ComponentUtils;
+import org.apache.streams.util.SerializationUtil;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Instagram {@link org.apache.streams.core.StreamsProvider} that provides the recent media data for a group of users
+ */
+public class InstagramRecentMediaProvider implements StreamsProvider {
+
+    private InstagramConfiguration config;
+    private InstagramRecentMediaCollector dataCollector;
+    protected Queue<MediaFeedData> mediaFeedQueue; //exposed for testing
+    private ExecutorService executorService;
+    private AtomicBoolean isCompleted;
+
+    public InstagramRecentMediaProvider() {
+        this.config = InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram"));
+    }
+
+    public InstagramRecentMediaProvider(InstagramConfiguration config) {
+        this.config = (InstagramConfiguration) SerializationUtil.cloneBySerialization(config);
+    }
+
+    @Override
+    public void startStream() {
+        this.dataCollector = getInstagramRecentMediaCollector();
+        this.executorService = Executors.newSingleThreadExecutor();
+        this.executorService.submit(this.dataCollector);
+    }
+
+    /**
+     * EXPOSED FOR TESTING
+     * @return
+     */
+    @VisibleForTesting
+    protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
+        this.updateUserInfoList();
+        return new InstagramRecentMediaCollector(this.mediaFeedQueue, this.config);
+    }
+
+
+    @Override
+    public StreamsResultSet readCurrent() {
+        Queue<StreamsDatum> batch = Queues.newConcurrentLinkedQueue();
+        MediaFeedData data = null;
+        while(!this.mediaFeedQueue.isEmpty()) {
+            data = ComponentUtils.pollWhileNotEmpty(this.mediaFeedQueue);
+            batch.add(new StreamsDatum(data, data.getId()));
+        }
+        this.isCompleted.set(batch.size() == 0 && this.mediaFeedQueue.isEmpty() && this.dataCollector.isCompleted());
+        return new StreamsResultSet(batch);
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return !this.isCompleted.get();
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        this.mediaFeedQueue = Queues.newConcurrentLinkedQueue();
+        this.isCompleted = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void cleanUp() {
+        this.executorService.shutdown();
+        try {
+            this.executorService.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        } finally {
+            this.executorService = null;
+        }
+    }
+
+    /**
+     * Add default start and stop points if necessary.
+     */
+    private void updateUserInfoList() {
+        UsersInfo usersInfo = this.config.getUsersInfo();
+        if(usersInfo.getDefaultAfterDate() == null && usersInfo.getDefaultBeforeDate() == null) {
+            return;
+        }
+        DateTime defaultAfterDate = usersInfo.getDefaultAfterDate();
+        DateTime defaultBeforeDate = usersInfo.getDefaultBeforeDate();
+        for(User user : usersInfo.getUsers()) {
+            if(defaultAfterDate != null && user.getAfterDate() == null) {
+                user.setAfterDate(defaultAfterDate);
+            }
+            if(defaultBeforeDate != null && user.getBeforeDate() == null) {
+                user.setBeforeDate(defaultBeforeDate);
+            }
+        }
+    }
+
+    /**
+     * Overrides the client id in the configuration.
+     * @param clientId client id to use
+     */
+    public void setInstagramClientId(String clientId) {
+        this.config.setClientId(clientId);
+    }
+
+    /**
+     * Overrides authroized user tokens in the configuration.
+     * @param tokenStrings
+     */
+    public void setAuthorizedUserTokens(Collection<String> tokenStrings) {
+        ensureUsersInfo(this.config).setAuthorizedTokens(Sets.newHashSet(tokenStrings));
+    }
+
+    /**
+     * Overrides the default before date in the configuration
+     * @param beforeDate
+     */
+    public void setDefaultBeforeDate(DateTime beforeDate) {
+        ensureUsersInfo(this.config).setDefaultBeforeDate(beforeDate);
+    }
+
+    /**
+     * Overrides the default after date in the configuration
+     * @param afterDate
+     */
+    public void setDefaultAfterDate(DateTime afterDate) {
+        ensureUsersInfo(this.config).setDefaultAfterDate(afterDate);
+    }
+
+    /**
+     * Overrides the users in the configuration and sets the after date for each user. A NULL DateTime implies
+     * pull data from as early as possible.  If default before or after DateTimes are set, they will applied to all
+     * NULL DateTimes.
+     * @param usersWithAfterDate instagram user id mapped to BeforeDate time
+     */
+    public void setUsersWithAfterDate(Map<String, DateTime> usersWithAfterDate) {
+        Set<User> users = Sets.newHashSet();
+        for(String userId : usersWithAfterDate.keySet()) {
+            User user = new User();
+            user.setUserId(userId);
+            user.setAfterDate(usersWithAfterDate.get(userId));
+            users.add(user);
+        }
+        ensureUsersInfo(this.config).setUsers(users);
+    }
+
+    private UsersInfo ensureUsersInfo(InstagramConfiguration config) {
+        UsersInfo usersInfo = config.getUsersInfo();
+        if(usersInfo == null) {
+            usersInfo = new UsersInfo();
+            config.setUsersInfo(usersInfo);
+        }
+        return usersInfo;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java
new file mode 100644
index 0000000..3bae50f
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java
@@ -0,0 +1,82 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+package org.apache.streams.instagram.provider.userinfo;
+
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.InstagramConfigurator;
+import org.apache.streams.util.SerializationUtil;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public class InstagramUserInfoProvider implements StreamsProvider {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(InstagramUserInfoProvider.class);
+
+    private InstagramConfiguration config;
+    private AtomicBoolean isComplete;
+
+    public InstagramUserInfoProvider() {
+        this.config = InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram"));
+    }
+
+    public InstagramUserInfoProvider(InstagramConfiguration config) {
+        this.config = SerializationUtil.cloneBySerialization(config);
+    }
+
+    @Override
+    public void startStream() {
+
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return this.isComplete.get();
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        this.isComplete = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
deleted file mode 100644
index 1d234c9..0000000
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance *
-http://www.apache.org/licenses/LICENSE-2.0 *
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License. */
-package org.apache.streams.instagram.provider;
-
-import com.carrotsearch.randomizedtesting.RandomizedTest;
-import com.carrotsearch.randomizedtesting.annotations.Repeat;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
-import org.apache.streams.instagram.InstagramConfiguration;
-import org.apache.streams.instagram.InstagramUserInformationConfiguration;
-import org.apache.streams.instagram.User;
-import org.apache.streams.instagram.UsersInfo;
-import org.jinstagram.Instagram;
-import org.jinstagram.entity.common.Pagination;
-import org.jinstagram.entity.users.feed.MediaFeed;
-import org.jinstagram.entity.users.feed.MediaFeedData;
-import org.jinstagram.exceptions.InstagramBadRequestException;
-import org.jinstagram.exceptions.InstagramException;
-import org.jinstagram.exceptions.InstagramRateLimitException;
-import org.joda.time.DateTime;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.junit.Assert.*;
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for {@link org.apache.streams.instagram.provider.InstagramRecentMediaCollector}
- */
-public class InstagramRecentMediaCollectorTest extends RandomizedTest {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollectorTest.class);
-
-    private int expectedDataCount = 0;
-
-
-
-
-    @Test
-    @Repeat(iterations = 3)
-    public void testRun() {
-        this.expectedDataCount = 0;
-        Queue<MediaFeedData> data = Queues.newConcurrentLinkedQueue();
-        InstagramConfiguration config = new InstagramConfiguration();
-        UsersInfo usersInfo = new UsersInfo();
-        config.setUsersInfo(usersInfo);
-        Set<User> users = creatUsers(randomIntBetween(0, 100));
-        usersInfo.setUsers(users);
-
-        final Instagram mockInstagram = createMockInstagramClient();
-        InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(data, config) {
-            @Override
-            protected Instagram getNextInstagramClient() {
-                return mockInstagram;
-            }
-        };
-        assertFalse(collector.isCompleted());
-        collector.run();
-        assertTrue(collector.isCompleted());
-        assertEquals(this.expectedDataCount, data.size());
-    }
-
-    private Instagram createMockInstagramClient() {
-        final Instagram instagramClient = mock(Instagram.class);
-        try {
-            final InstagramException mockException = mock(InstagramException.class);
-            when(mockException.getRemainingLimitStatus()).thenReturn(-1);
-            when(mockException.getMessage()).thenReturn("MockInstagramException message");
-            when(instagramClient.getRecentMediaFeed(anyLong(), anyInt(), anyString(), anyString(), any(Date.class), any(Date.class))).thenAnswer(new Answer<MediaFeed>() {
-                @Override
-                public MediaFeed answer(InvocationOnMock invocationOnMock) throws Throwable {
-                    if (randomInt(20) == 0) { //5% throw exceptions
-                        int type = randomInt(4);
-                        if (type == 0)
-                            throw mock(InstagramRateLimitException.class);
-                        else if (type == 1)
-                            throw mock(InstagramBadRequestException.class);
-                        else if (type == 2)
-                            throw mock(InstagramException.class);
-                        else
-                            throw new Exception();
-                    } else {
-                        return createRandomMockMediaFeed();
-                    }
-                }
-            });
-            when(instagramClient.getRecentMediaNextPage(any(Pagination.class))).thenAnswer(new Answer<MediaFeed>() {
-                @Override
-                public MediaFeed answer(InvocationOnMock invocationOnMock) throws Throwable {
-                    return createRandomMockMediaFeed();
-                }
-            });
-        } catch (InstagramException ie) {
-            fail("Failed to create mock instagram client.");
-        }
-        return instagramClient;
-    }
-
-    private Set<User> creatUsers(int numUsers) {
-        Set<User> users = Sets.newHashSet();
-        for(int i=0; i < numUsers; ++i) {
-            User user = new User();
-            user.setUserId(Integer.toString(randomInt()));
-            if(randomInt(2) == 0) {
-                user.setAfterDate(DateTime.now().minusSeconds(randomIntBetween(0, 1000)));
-            }
-            if(randomInt(2) == 0) {
-                user.setBeforeDate(DateTime.now());
-            }
-            users.add(user);
-        }
-        return users;
-    }
-
-    private MediaFeed createRandomMockMediaFeed() throws InstagramException {
-        MediaFeed feed = mock(MediaFeed.class);
-        when(feed.getData()).thenReturn(createData(randomInt(100)));
-        Pagination pagination = mock(Pagination.class);
-        if(randomInt(2) == 0) {
-            when(pagination.hasNextPage()).thenReturn(true);
-        } else {
-            when(pagination.hasNextPage()).thenReturn(false);
-        }
-        when(feed.getPagination()).thenReturn(pagination);
-        return feed;
-    }
-
-    private List<MediaFeedData> createData(int size) {
-        List<MediaFeedData> data = Lists.newLinkedList();
-        for(int i=0; i < size; ++i) {
-            data.add(mock(MediaFeedData.class));
-        }
-        this.expectedDataCount += size;
-        return data;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
deleted file mode 100644
index 71e569b..0000000
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance *
-http://www.apache.org/licenses/LICENSE-2.0 *
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License. */
-package org.apache.streams.instagram.provider;
-
-import com.google.common.collect.Sets;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.instagram.InstagramConfiguration;
-import org.apache.streams.instagram.InstagramUserInformationConfiguration;
-import org.apache.streams.instagram.User;
-import org.apache.streams.instagram.UsersInfo;
-import org.jinstagram.InstagramConfig;
-import org.jinstagram.entity.users.feed.MediaFeedData;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashSet;
-import java.util.Random;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-/**
- *
- */
-public class InstagramRecentMediaProviderTest {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaProviderTest.class);
-
-    @Test
-    public void testStartStream() throws InterruptedException {
-        final CountDownLatch latch = new CountDownLatch(1);
-        final InstagramRecentMediaCollector collectorStub = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), createNonNullConfiguration()) {
-
-            private volatile boolean isFinished = false;
-
-            @Override
-            public void run() {
-                this.isFinished = true;
-                latch.countDown();
-            }
-
-            @Override
-            public boolean isCompleted() {
-                return this.isFinished;
-            }
-        };
-
-        InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(null) {
-            @Override
-            protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
-                return collectorStub;
-            }
-        };
-        provider.prepare(null);
-        provider.startStream();
-
-        latch.await();
-        assertTrue(collectorStub.isCompleted());
-        StreamsResultSet result = provider.readCurrent();
-        assertNotNull(result);
-        assertEquals(0, result.size());
-        assertTrue(!provider.isRunning());
-        try {
-            provider.cleanUp();
-        } catch (Throwable throwable){
-            throwable.printStackTrace();
-            fail("Error durring clean up");
-        }
-    }
-
-    @Test
-    public void testReadCurrent() {
-        final long seed = System.nanoTime();
-        final Random rand = new Random(seed);
-        final CyclicBarrier test = new CyclicBarrier(2);
-        final CyclicBarrier produce = new CyclicBarrier(2);
-        final AtomicInteger batchCount = new AtomicInteger(0);
-        InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(createNonNullConfiguration()) {
-            @Override
-            protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
-                return new InstagramRecentMediaCollector(super.mediaFeedQueue, createNonNullConfiguration()) {
-
-                    private volatile boolean isFinished = false;
-
-
-
-                    public int getBatchCount() {
-                        return batchCount.get();
-                    }
-
-                    @Override
-                    public boolean isCompleted() {
-                        return isFinished;
-                    }
-
-                    @Override
-                    public void run() {
-                        int randInt = rand.nextInt(5);
-                        while(randInt != 0) {
-                            int batchSize = rand.nextInt(200);
-                            for(int i=0; i < batchSize; ++i) {
-                                while(!super.dataQueue.add(mock(MediaFeedData.class))) {
-                                    Thread.yield();
-                                }
-                            }
-                            batchCount.set(batchSize);
-                            try {
-                                test.await();
-                                produce.await();
-                            } catch (InterruptedException ie ) {
-                                Thread.currentThread().interrupt();
-                            } catch (BrokenBarrierException bbe) {
-                                Thread.currentThread().interrupt();
-                            }
-                            randInt = rand.nextInt(5);
-                        }
-                        batchCount.set(0);
-                        isFinished = true;
-                        try {
-                            test.await();
-                            produce.await();
-                        } catch (InterruptedException ie) {
-                            Thread.currentThread().interrupt();
-                        } catch (BrokenBarrierException bbe) {
-                            Thread.currentThread().interrupt();
-                        }
-                    }
-
-                };
-            }
-        };
-        provider.prepare(null);
-        provider.startStream();
-        while(provider.isRunning()) {
-            try {
-                test.await();
-                assertEquals("Seed == "+seed, batchCount.get(), provider.readCurrent().size());
-                produce.await();
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-            } catch (BrokenBarrierException bbe) {
-                Thread.currentThread().interrupt();
-            }
-
-        }
-    }
-
-    private InstagramConfiguration createNonNullConfiguration() {
-        InstagramConfiguration configuration = new InstagramConfiguration();
-        UsersInfo info = new UsersInfo();
-        configuration.setUsersInfo(info);
-        info.setUsers(new HashSet<User>());
-        info.setAuthorizedTokens(new HashSet<String>());
-        return configuration;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java
new file mode 100644
index 0000000..0020652
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java
@@ -0,0 +1,155 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+package org.apache.streams.instagram.provider.recentmedia;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.User;
+import org.apache.streams.instagram.UsersInfo;
+import org.jinstagram.Instagram;
+import org.jinstagram.entity.common.Pagination;
+import org.jinstagram.entity.users.feed.MediaFeed;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.jinstagram.exceptions.InstagramBadRequestException;
+import org.jinstagram.exceptions.InstagramException;
+import org.jinstagram.exceptions.InstagramRateLimitException;
+import org.joda.time.DateTime;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link org.apache.streams.instagram.provider.recentmedia.InstagramRecentMediaCollector}
+ */
+public class InstagramRecentMediaCollectorTest extends RandomizedTest {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollectorTest.class);
+
+    private int expectedDataCount = 0;
+
+
+
+
+    @Test
+    @Repeat(iterations = 3)
+    public void testRun() {
+        this.expectedDataCount = 0;
+        Queue<MediaFeedData> data = Queues.newConcurrentLinkedQueue();
+        InstagramConfiguration config = new InstagramConfiguration();
+        UsersInfo usersInfo = new UsersInfo();
+        config.setUsersInfo(usersInfo);
+        Set<User> users = creatUsers(randomIntBetween(0, 100));
+        usersInfo.setUsers(users);
+
+        final Instagram mockInstagram = createMockInstagramClient();
+        InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(data, config) {
+            @Override
+            protected Instagram getNextInstagramClient() {
+                return mockInstagram;
+            }
+        };
+        assertFalse(collector.isCompleted());
+        collector.run();
+        assertTrue(collector.isCompleted());
+        assertEquals(this.expectedDataCount, data.size());
+    }
+
+    private Instagram createMockInstagramClient() {
+        final Instagram instagramClient = mock(Instagram.class);
+        try {
+            final InstagramException mockException = mock(InstagramException.class);
+            when(mockException.getRemainingLimitStatus()).thenReturn(-1);
+            when(mockException.getMessage()).thenReturn("MockInstagramException message");
+            when(instagramClient.getRecentMediaFeed(anyLong(), anyInt(), anyString(), anyString(), any(Date.class), any(Date.class))).thenAnswer(new Answer<MediaFeed>() {
+                @Override
+                public MediaFeed answer(InvocationOnMock invocationOnMock) throws Throwable {
+                    if (randomInt(20) == 0) { //5% throw exceptions
+                        int type = randomInt(4);
+                        if (type == 0)
+                            throw mock(InstagramRateLimitException.class);
+                        else if (type == 1)
+                            throw mock(InstagramBadRequestException.class);
+                        else if (type == 2)
+                            throw mock(InstagramException.class);
+                        else
+                            throw new Exception();
+                    } else {
+                        return createRandomMockMediaFeed();
+                    }
+                }
+            });
+            when(instagramClient.getRecentMediaNextPage(any(Pagination.class))).thenAnswer(new Answer<MediaFeed>() {
+                @Override
+                public MediaFeed answer(InvocationOnMock invocationOnMock) throws Throwable {
+                    return createRandomMockMediaFeed();
+                }
+            });
+        } catch (InstagramException ie) {
+            fail("Failed to create mock instagram client.");
+        }
+        return instagramClient;
+    }
+
+    private Set<User> creatUsers(int numUsers) {
+        Set<User> users = Sets.newHashSet();
+        for(int i=0; i < numUsers; ++i) {
+            User user = new User();
+            user.setUserId(Integer.toString(randomInt()));
+            if(randomInt(2) == 0) {
+                user.setAfterDate(DateTime.now().minusSeconds(randomIntBetween(0, 1000)));
+            }
+            if(randomInt(2) == 0) {
+                user.setBeforeDate(DateTime.now());
+            }
+            users.add(user);
+        }
+        return users;
+    }
+
+    private MediaFeed createRandomMockMediaFeed() throws InstagramException {
+        MediaFeed feed = mock(MediaFeed.class);
+        when(feed.getData()).thenReturn(createData(randomInt(100)));
+        Pagination pagination = mock(Pagination.class);
+        if(randomInt(2) == 0) {
+            when(pagination.hasNextPage()).thenReturn(true);
+        } else {
+            when(pagination.hasNextPage()).thenReturn(false);
+        }
+        when(feed.getPagination()).thenReturn(pagination);
+        return feed;
+    }
+
+    private List<MediaFeedData> createData(int size) {
+        List<MediaFeedData> data = Lists.newLinkedList();
+        for(int i=0; i < size; ++i) {
+            data.add(mock(MediaFeedData.class));
+        }
+        this.expectedDataCount += size;
+        return data;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java
new file mode 100644
index 0000000..4a6396b
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java
@@ -0,0 +1,172 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+package org.apache.streams.instagram.provider.recentmedia;
+
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.User;
+import org.apache.streams.instagram.UsersInfo;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+/**
+ *
+ */
+public class InstagramRecentMediaProviderTest {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaProviderTest.class);
+
+    @Test
+    public void testStartStream() throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+        final InstagramRecentMediaCollector collectorStub = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), createNonNullConfiguration()) {
+
+            private volatile boolean isFinished = false;
+
+            @Override
+            public void run() {
+                this.isFinished = true;
+                latch.countDown();
+            }
+
+            @Override
+            public boolean isCompleted() {
+                return this.isFinished;
+            }
+        };
+
+        InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(null) {
+            @Override
+            protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
+                return collectorStub;
+            }
+        };
+        provider.prepare(null);
+        provider.startStream();
+
+        latch.await();
+        assertTrue(collectorStub.isCompleted());
+        StreamsResultSet result = provider.readCurrent();
+        assertNotNull(result);
+        assertEquals(0, result.size());
+        assertTrue(!provider.isRunning());
+        try {
+            provider.cleanUp();
+        } catch (Throwable throwable){
+            throwable.printStackTrace();
+            fail("Error durring clean up");
+        }
+    }
+
+    @Test
+    public void testReadCurrent() {
+        final long seed = System.nanoTime();
+        final Random rand = new Random(seed);
+        final CyclicBarrier test = new CyclicBarrier(2);
+        final CyclicBarrier produce = new CyclicBarrier(2);
+        final AtomicInteger batchCount = new AtomicInteger(0);
+        InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(createNonNullConfiguration()) {
+            @Override
+            protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
+                return new InstagramRecentMediaCollector(super.mediaFeedQueue, createNonNullConfiguration()) {
+
+                    private volatile boolean isFinished = false;
+
+
+
+                    public int getBatchCount() {
+                        return batchCount.get();
+                    }
+
+                    @Override
+                    public boolean isCompleted() {
+                        return isFinished;
+                    }
+
+                    @Override
+                    public void run() {
+                        int randInt = rand.nextInt(5);
+                        while(randInt != 0) {
+                            int batchSize = rand.nextInt(200);
+                            for(int i=0; i < batchSize; ++i) {
+                                while(!super.dataQueue.add(mock(MediaFeedData.class))) {
+                                    Thread.yield();
+                                }
+                            }
+                            batchCount.set(batchSize);
+                            try {
+                                test.await();
+                                produce.await();
+                            } catch (InterruptedException ie ) {
+                                Thread.currentThread().interrupt();
+                            } catch (BrokenBarrierException bbe) {
+                                Thread.currentThread().interrupt();
+                            }
+                            randInt = rand.nextInt(5);
+                        }
+                        batchCount.set(0);
+                        isFinished = true;
+                        try {
+                            test.await();
+                            produce.await();
+                        } catch (InterruptedException ie) {
+                            Thread.currentThread().interrupt();
+                        } catch (BrokenBarrierException bbe) {
+                            Thread.currentThread().interrupt();
+                        }
+                    }
+
+                };
+            }
+        };
+        provider.prepare(null);
+        provider.startStream();
+        while(provider.isRunning()) {
+            try {
+                test.await();
+                assertEquals("Seed == "+seed, batchCount.get(), provider.readCurrent().size());
+                produce.await();
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            } catch (BrokenBarrierException bbe) {
+                Thread.currentThread().interrupt();
+            }
+
+        }
+    }
+
+    private InstagramConfiguration createNonNullConfiguration() {
+        InstagramConfiguration configuration = new InstagramConfiguration();
+        UsersInfo info = new UsersInfo();
+        configuration.setUsersInfo(info);
+        info.setUsers(new HashSet<User>());
+        info.setAuthorizedTokens(new HashSet<String>());
+        return configuration;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0df06b60/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java b/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
index 0975ea4..bc3583e 100644
--- a/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
+++ b/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
@@ -62,7 +62,7 @@ public class SerializationUtil {
     }
 
 
-    public static Object cloneBySerialization(Object obj) {
-        return deserialize(serialize(obj));
+    public static <T> T cloneBySerialization(T obj) {
+        return (T) deserialize(serialize(obj));
     }
 }


Mime
View raw message