streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mfrank...@apache.org
Subject [2/8] git commit: Backoff and token Utils classes implemented and tested
Date Mon, 18 Aug 2014 16:10:20 GMT
Backoff and token Utils classes implemented and tested


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ff5f8211
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ff5f8211
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ff5f8211

Branch: refs/heads/master
Commit: ff5f821121e98dccc5f72a313583e533f3a5bf8d
Parents: b22efee
Author: Ryan Ebanks <rebanks@Informations-MacBook-Pro.local>
Authored: Fri Aug 8 17:15:49 2014 -0500
Committer: Ryan Ebanks <rebanks@Informations-MacBook-Pro.local>
Committed: Fri Aug 8 17:15:49 2014 -0500

----------------------------------------------------------------------
 .../com/instagram/InstagramConfiguration.json   |  53 +++++-
 streams-util/pom.xml                            |   6 +-
 .../api/requests/backoff/BackOffException.java  |  48 +++++
 .../api/requests/backoff/BackOffStrategy.java   |  87 +++++++++
 .../impl/ConstantTimeBackOffStrategy.java       |  33 ++++
 .../impl/ExponentialBackOffStrategy.java        |  32 ++++
 .../backoff/impl/LinearTimeBackOffStrategy.java |  25 +++
 .../streams/util/oauth/tokens/OauthToken.java   |  19 ++
 .../tokens/tokenmanager/SimpleTokenManager.java |  39 ++++
 .../tokenmanager/impl/BasicTokenManger.java     |  72 ++++++++
 .../requests/backoff/BackOffStrategyTest.java   |  70 ++++++++
 .../ConstantTimeBackOffStrategyTest.java        |  26 +++
 .../backoff/ExponentialBackOffStrategyTest.java |  23 +++
 .../backoff/LinearTimeBackOffStartegyTest.java  |  22 +++
 .../tokenmanager/TestBasicTokenManager.java     | 176 +++++++++++++++++++
 15 files changed, 722 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
index f8f8117..7b894a2 100644
--- a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
+++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
@@ -9,13 +9,50 @@
             "type": "string",
             "description": "Your Instagram Client Id"
         },
-        "clientSecret": {
-            "type": "string",
-            "description": "Your Instagram Client secret"
-        },
-        "callbackUrl": {
-            "type": "string",
-            "description": "Your Instagream callback url"
+        "usersInfo": {
+            "type": "object",
+            "properties": {
+                "userIds": {
+                    "type": "array",
+                    "uniqueItems": true,
+                    "items": {
+                        "type": "object",
+                        "$ref": "#/definitions/userInfo"
+                    },
+                    "description": "List of user ids to gather data for. Type of data gathered
depends on provider"
+                },
+                "defaultAfterDate": {
+                    "type": "string",
+                    "format": "datetime",
+                    "description": "If the api allows to gather data by date range, this
date will be used as the start of the range for the request for all users that don't have
date ranges specified. If this is null it will pull from the earliest possible time"
+                },
+                "defaultBeforeDate": {
+                    "type": "string",
+                    "format": "datetime",
+                    "description": "If the api allows to gather data by date range, this
date will be used as the end of the range for the request for all users that don't have date
ranges specified. If this is null it will pull till current time."
+                }
+            }
+        }
+   },
+    "definitions": {
+        "userInfo": {
+            "type": "object",
+            "properties": {
+                "userId": {
+                    "type": "string",
+                    "description": "instagram user id"
+                },
+                "afterDate": {
+                    "type": "string",
+                    "format": "datetime",
+                    "description": "If the api allows to gather data by date range, this
date will be used as the start of the range for the request for this user. If this is null
it will use the defaultBeforeDate."
+                },
+                "beforeDate": {
+                    "type": "string",
+                    "format": "datetime",
+                    "description": "If the api allows to gather data by date range, this
date will be used as the end of the range for the request for this user.. If this is null
it will use the defaultAfterDate."
+                }
+            }
         }
-   }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/pom.xml
----------------------------------------------------------------------
diff --git a/streams-util/pom.xml b/streams-util/pom.xml
index 0a48ec9..7a50201 100644
--- a/streams-util/pom.xml
+++ b/streams-util/pom.xml
@@ -43,6 +43,10 @@
             <groupId>joda-time</groupId>
             <artifactId>joda-time</artifactId>
         </dependency>
-
+        <dependency>
+            <groupId>com.carrotsearch.randomizedtesting</groupId>
+            <artifactId>randomizedtesting-runner</artifactId>
+            <version>2.1.2</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java
b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java
new file mode 100644
index 0000000..0bdd82d
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java
@@ -0,0 +1,48 @@
+package org.apache.streams.util.api.requests.backoff;
+
+/**
+ * Exception that is thrown when a {@link org.apache.streams.util.api.requests.backoff.BackOffStrategy}
has attempted to
+ * <code>backOff()</code> more than the {@link org.apache.streams.util.api.requests.backoff.BackOffStrategy}
was configured for.
+ */
+public class BackOffException extends Exception {
+
+    private int attemptCount;
+    private long sleepTime;
+
+    public BackOffException() {
+        this(-1, -1);
+    }
+
+    public BackOffException(String message) {
+        this(message, -1, -1);
+    }
+
+    public BackOffException(int attemptCount, long maxSleepTime) {
+        this.attemptCount = attemptCount;
+        this.sleepTime = maxSleepTime;
+    }
+
+    public BackOffException(String message, int attemptCount, long maxSleepTime) {
+        super(message);
+        this.attemptCount = attemptCount;
+        this.sleepTime = maxSleepTime;
+    }
+
+    /**
+     * Gets the number of back off attempts that happened before the exception was thrown.
If the function that
+     * initialized this exception does not set the number of attempts, -1 will be returned.
+     * @return number of attempts
+     */
+    public int getNumberOfBackOffsAttempted() {
+        return this.attemptCount;
+    }
+
+    /**
+     * Gets the longest sleep period that the strategy attempted. If the function that
+     * initialized this exception does not set the longest sleep period, -1 will be returned.
+     * @return
+     */
+    public long getLongestBackOff() {
+        return this.sleepTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java
b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java
new file mode 100644
index 0000000..628d37b
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java
@@ -0,0 +1,87 @@
+package org.apache.streams.util.api.requests.backoff;
+
+/**
+ * BackOffStrategy will cause the current thread to sleep for a specific amount of time.
This is used to adhere to
+ * api rate limits.
+ *
+ * The example below illustrates using a BackOffStrategy to slow down requests when you hit
a rate limit exception.
+ *
+ * <code>
+ *     public void pollApi(ApiClient apiClient, BackOffStrategy backOffStrategy) throws BackOffException
{
+ *          while( apiClient.hasMoreData() ) {
+ *              try {
+ *                  apiClient.pollData();
+ *              } catch (RateLimitException rle) {
+ *                  backOffStrategy.backOff();
+ *              }
+ *          }
+ *     }
+ * </code>
+ *
+ */
+public abstract class BackOffStrategy {
+
+    private long baseSleepTime;
+    private long lastSleepTime;
+    private int maxAttempts;
+    private int attemptsCount;
+
+    /**
+     * A BackOffStrategy that can effectively be used endlessly.
+     * @param baseBackOffTime amount of time back of in seconds
+     */
+    public BackOffStrategy(long baseBackOffTime) {
+        this(baseBackOffTime, -1);
+    }
+
+    /**
+     * A BackOffStrategy that has a limited number of uses before it throws a {@link org.apache.streams.util.api.requests.backoff.BackOffException}
+     * @param baseBackOffTime time to back off in milliseconds, must be greater than 0.
+     * @param maximumNumberOfBackOffAttempts maximum number of attempts, must be grater than
0 or -1. -1 indicates there is no maximum number of attempts.
+     */
+    public BackOffStrategy(long baseBackOffTime, int maximumNumberOfBackOffAttempts) {
+        if(baseBackOffTime <= 0) {
+            throw new IllegalArgumentException("backOffTimeInMilliSeconds is not greater
than 0 : "+baseBackOffTime);
+        }
+        if(maximumNumberOfBackOffAttempts<=0 && maximumNumberOfBackOffAttempts
!= -1) {
+            throw new IllegalArgumentException("maximumNumberOfBackOffAttempts is not greater
than 0 : "+maximumNumberOfBackOffAttempts);
+        }
+        this.baseSleepTime = baseBackOffTime;
+        this.maxAttempts = maximumNumberOfBackOffAttempts;
+        this.attemptsCount = 0;
+    }
+
+    /**
+     * Cause the current thread to sleep for an amount of time based on the implemented strategy.
If limits are set
+     * on the number of times the backOff can be called, an exception will be thrown.
+     * @throws BackOffException
+     */
+    public void backOff() throws BackOffException {
+        if(this.attemptsCount++ >= this.maxAttempts && this.maxAttempts != -1)
{
+            throw new BackOffException(this.attemptsCount-1, this.lastSleepTime);
+        } else {
+            try {
+                Thread.sleep(this.lastSleepTime = calculateBackOffTime(this.attemptsCount,
this.baseSleepTime));
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Rests the back off strategy to its original state.  After the call the strategy will
act as if {@link BackOffStrategy#backOff()}
+     * has never been called.
+     */
+    public void reset() {
+        this.attemptsCount = 0;
+    }
+
+    /**
+     * Calculate the amount of time in milliseconds that the strategy should back off for
+     * @param attemptCount the number of attempts the strategy has backed off. i.e. 1 ->
this is the first attempt, 2 -> this is the second attempt, etc.
+     * @param baseSleepTime the minimum amount of time it should back off for in milliseconds
+     * @return the amount of time it should back off in milliseconds
+     */
+    protected abstract long calculateBackOffTime(int attemptCount, long baseSleepTime);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java
b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java
new file mode 100644
index 0000000..bfc523a
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java
@@ -0,0 +1,33 @@
+package org.apache.streams.util.api.requests.backoff.impl;
+
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
+/**
+ * A {@link org.apache.streams.util.api.requests.backoff.BackOffStrategy} that causes the
current thread to sleep the
+ * same amount of time each time <code>backOff()</code> is called.
+ *
+ */
+public class ConstantTimeBackOffStrategy extends BackOffStrategy {
+
+    /**
+     * A ConstantTimeBackOffStrategy that can effectively be used endlessly.
+     * @param baseBackOffTimeInMiliseconds amount of time back of in milliseconds
+     */
+    public ConstantTimeBackOffStrategy(long baseBackOffTimeInMiliseconds) {
+        this(baseBackOffTimeInMiliseconds, -1);
+    }
+
+    /**
+     * A ConstantTimeBackOffStrategy that has a limited number of uses before it throws a
{@link org.apache.streams.util.api.requests.backoff.BackOffException}
+     * @param baseBackOffTimeInMiliseconds time to back off in milliseconds, must be greater
than 0.
+     * @param maximumNumberOfBackOffAttempts maximum number of attempts, must be grater than
0 or -1. -1 indicates there is no maximum number of attempts.
+     */
+    public ConstantTimeBackOffStrategy(long baseBackOffTimeInMiliseconds, int maximumNumberOfBackOffAttempts)
{
+        super(baseBackOffTimeInMiliseconds, maximumNumberOfBackOffAttempts);
+    }
+
+    @Override
+    protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
+        return baseSleepTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java
b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java
new file mode 100644
index 0000000..af59a6a
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java
@@ -0,0 +1,32 @@
+package org.apache.streams.util.api.requests.backoff.impl;
+
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
+/**
+ * Exponential backk strategy.  Caluclated by baseBackOffTimeInSeconds raised the attempt-count
power.
+ */
+public class ExponentialBackOffStrategy extends BackOffStrategy {
+
+
+    /**
+     * Unlimited use ExponentialBackOffStrategy
+     * @param baseBackOffTimeInSeconds
+     */
+    public ExponentialBackOffStrategy(int baseBackOffTimeInSeconds) {
+        this(baseBackOffTimeInSeconds, -1);
+    }
+
+    /**
+     * Limited use ExponentialBackOffStrategy
+     * @param baseBackOffTimeInSeconds
+     * @param maxNumAttempts
+     */
+    public ExponentialBackOffStrategy(int baseBackOffTimeInSeconds, int maxNumAttempts) {
+        super(baseBackOffTimeInSeconds, maxNumAttempts);
+    }
+
+    @Override
+    protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
+        return Math.round(Math.pow(baseSleepTime, attemptCount)) * 1000;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java
b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java
new file mode 100644
index 0000000..55f62a2
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java
@@ -0,0 +1,25 @@
+package org.apache.streams.util.api.requests.backoff.impl;
+
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
+/**
+ * A {@link org.apache.streams.util.api.requests.backoff.BackOffStrategy} that causes back
offs in linear increments. Each
+ * attempt cause an increase back off period.
+ * Calculated by attemptNumber * baseBackOffAmount.
+ */
+public class LinearTimeBackOffStrategy extends BackOffStrategy{
+
+
+    public LinearTimeBackOffStrategy(int baseBackOffTimeInSeconds) {
+        this(baseBackOffTimeInSeconds, -1);
+    }
+
+    public LinearTimeBackOffStrategy(int baseBackOffTimeInSeconds, int maxAttempts) {
+        super(baseBackOffTimeInSeconds, -1);
+    }
+
+    @Override
+    protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
+        return 1000L * attemptCount * baseSleepTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/OauthToken.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/OauthToken.java
b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/OauthToken.java
new file mode 100644
index 0000000..df264c5
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/OauthToken.java
@@ -0,0 +1,19 @@
+package org.apache.streams.util.oauth.tokens;
+
+/**
+ *
+ */
+public abstract class OauthToken {
+
+    /**
+     * Must create equals method for all OauthTokens.
+     * @param o
+     * @return true if equal, and false otherwise
+     */
+    protected abstract boolean internalEquals(Object o);
+
+    @Override
+    public boolean equals(Object o) {
+        return this.internalEquals(o);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
new file mode 100644
index 0000000..d052da1
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
@@ -0,0 +1,39 @@
+package org.apache.streams.util.oauth.tokens.tokenmanager;
+
+import org.apache.streams.util.oauth.tokens.OauthToken;
+
+import java.util.Collection;
+
+/**
+ * Manges access to oauth tokens.  Allows a caller to add tokens to the token pool and receive
an available token.
+ */
+public interface SimpleTokenManager<T extends OauthToken> {
+
+
+    /**
+     * Adds a token to the available token pool.
+     * @param token Token to be added
+     * @return true, if token was successfully added to the pool and false otherwise.
+     */
+    public boolean addTokenToPool(T token);
+
+    /**
+     * Adds a {@link java.util.Collection} of tokens to the available token pool.
+     * @param tokens Tokens to be added
+     * @return true, if the token pool size increased after adding the tokens, and false
otherwise.
+     */
+    public boolean addAllTokensToPool(Collection<T> tokens);
+
+    /**
+     * Get an available token. If no tokens are available it returns null.
+     * @return next available token
+     */
+    public T getNextAvailableToken();
+
+    /**
+     * Get the number of available tokens
+     * @return number of available tokens
+     */
+    public int numAvailableTokens();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
new file mode 100644
index 0000000..20c8d20
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
@@ -0,0 +1,72 @@
+package org.apache.streams.util.oauth.tokens.tokenmanager.impl;
+
+import org.apache.streams.util.oauth.tokens.OauthToken;
+import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Manages a pool of tokens the most basic possible way.  If all tokens are added to the
manager before {@link BasicTokenManger#getNextAvailableToken() getNextAvailableToken}
+ * is called tokens are issued in the order they were added to the manager, FIFO.  The BasicTokenManager
acts as a circular queue
+ * of tokens.  Once the manager issues all available tokens it will cycle back to the first
token and start issuing tokens again.
+ *
+ * When adding tokens to the pool of available tokens, the manager will not add tokens that
are already in the pool.
+ *
+ * The manager class is thread safe.
+ */
+public class BasicTokenManger<T extends OauthToken> implements SimpleTokenManager<T>{
+
+    private ArrayList<T> availableTokens;
+    private int nextToken;
+
+    public BasicTokenManger() {
+        this(null);
+    }
+
+    public BasicTokenManger(Collection<T> tokens) {
+        if(tokens != null) {
+            this.availableTokens = new ArrayList<T>(tokens.size());
+            this.addAllTokensToPool(tokens);
+        } else {
+            this.availableTokens = new ArrayList<T>();
+        }
+        this.nextToken = 0;
+    }
+
+    @Override
+    public synchronized boolean addTokenToPool(T token) {
+        if(token == null || this.availableTokens.contains(token))
+            return false;
+        else
+            return this.availableTokens.add(token);
+    }
+
+    @Override
+    public synchronized boolean addAllTokensToPool(Collection<T> tokens) {
+        int startSize = this.availableTokens.size();
+        for(T token : tokens) {
+            this.addTokenToPool(token);
+        }
+        return startSize < this.availableTokens.size();
+    }
+
+    @Override
+    public synchronized T getNextAvailableToken() {
+        T token = null;
+        if(this.availableTokens.size() == 0) {
+            return token;
+        } else {
+            token = this.availableTokens.get(nextToken++);
+            if(nextToken == this.availableTokens.size()) {
+                nextToken = 0;
+            }
+            return token;
+        }
+    }
+
+    @Override
+    public synchronized int numAvailableTokens() {
+        return this.availableTokens.size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/BackOffStrategyTest.java
----------------------------------------------------------------------
diff --git a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/BackOffStrategyTest.java
b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/BackOffStrategyTest.java
new file mode 100644
index 0000000..5f3453e
--- /dev/null
+++ b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/BackOffStrategyTest.java
@@ -0,0 +1,70 @@
+package org.apache.streams.util.api.requests.backoff;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit Tets
+ */
+public class BackOffStrategyTest {
+
+
+    private class TestBackOff extends BackOffStrategy {
+
+        public TestBackOff(long sleep, int maxAttempts) {
+            super(sleep, maxAttempts);
+        }
+
+        @Override
+        protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
+            return baseSleepTime;
+        }
+    }
+
+    @Test
+    public void testUnlimitedBackOff() {
+        BackOffStrategy backOff = new TestBackOff(1, -1);
+        try {
+            for(int i=0; i < 100; ++i) {
+                backOff.backOff();
+            }
+        } catch (BackOffException boe) {
+            fail("Threw BackOffException.  Not expected action");
+        }
+    }
+
+    @Test
+    public void testLimitedUseBackOff()  {
+        BackOffStrategy backOff = new TestBackOff(1, 2);
+        try {
+            backOff.backOff();
+        } catch (BackOffException boe) {
+            fail("Threw BackOffExpection. Not expected action");
+        }
+        try {
+            backOff.backOff();
+        } catch (BackOffException boe) {
+            fail("Threw BackOffExpection. Not expected action");
+        }
+        try {
+            backOff.backOff();
+            fail("Expected BackOffException to be thrown.");
+        } catch (BackOffException boe) {
+
+        }
+    }
+
+    @Test
+    public void testBackOffSleep() throws BackOffException {
+        BackOffStrategy backOff = new TestBackOff(2000, 1);
+        long startTime = System.currentTimeMillis();
+        backOff.backOff();
+        long endTime = System.currentTimeMillis();
+        assertTrue(endTime - startTime >= 2000);
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ConstantTimeBackOffStrategyTest.java
----------------------------------------------------------------------
diff --git a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ConstantTimeBackOffStrategyTest.java
b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ConstantTimeBackOffStrategyTest.java
new file mode 100644
index 0000000..c9e7de9
--- /dev/null
+++ b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ConstantTimeBackOffStrategyTest.java
@@ -0,0 +1,26 @@
+package org.apache.streams.util.api.requests.backoff;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import org.apache.streams.util.api.requests.backoff.impl.ConstantTimeBackOffStrategy;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit Tests
+ */
+public class ConstantTimeBackOffStrategyTest extends RandomizedTest{
+
+
+    @Test
+    public void constantTimeBackOffStategy() {
+        BackOffStrategy backOff = new ConstantTimeBackOffStrategy(1);
+        assertEquals(1, backOff.calculateBackOffTime(1,1));
+        assertEquals(1, backOff.calculateBackOffTime(2,1));
+        assertEquals(1, backOff.calculateBackOffTime(3,1));
+        assertEquals(1, backOff.calculateBackOffTime(4,1));
+        assertEquals(1, backOff.calculateBackOffTime(randomIntBetween(1, Integer.MAX_VALUE),1));
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ExponentialBackOffStrategyTest.java
----------------------------------------------------------------------
diff --git a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ExponentialBackOffStrategyTest.java
b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ExponentialBackOffStrategyTest.java
new file mode 100644
index 0000000..43b42f7
--- /dev/null
+++ b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ExponentialBackOffStrategyTest.java
@@ -0,0 +1,23 @@
+package org.apache.streams.util.api.requests.backoff;
+
+import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit Tests
+ */
+public class ExponentialBackOffStrategyTest {
+
+    @Test
+    public void exponentialTimeBackOffStrategyTest() {
+        BackOffStrategy backOff = new ExponentialBackOffStrategy(1);
+        assertEquals(5000, backOff.calculateBackOffTime(1,5));
+        assertEquals(25000, backOff.calculateBackOffTime(2,5));
+        assertEquals(125000, backOff.calculateBackOffTime(3,5));
+        assertEquals(2000, backOff.calculateBackOffTime(1,2));
+        assertEquals(16000, backOff.calculateBackOffTime(4,2));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStartegyTest.java
----------------------------------------------------------------------
diff --git a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStartegyTest.java
b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStartegyTest.java
new file mode 100644
index 0000000..7a0f848
--- /dev/null
+++ b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStartegyTest.java
@@ -0,0 +1,22 @@
+package org.apache.streams.util.api.requests.backoff;
+
+import org.apache.streams.util.api.requests.backoff.impl.LinearTimeBackOffStrategy;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit Tests
+ */
+public class LinearTimeBackOffStartegyTest {
+
+    @Test
+    public void linearTimeBackOffStrategyTest() {
+        BackOffStrategy backOff = new LinearTimeBackOffStrategy(1);
+        assertEquals(1000, backOff.calculateBackOffTime(1,1));
+        assertEquals(2000, backOff.calculateBackOffTime(2,1));
+        assertEquals(3000, backOff.calculateBackOffTime(3,1));
+        assertEquals(4000, backOff.calculateBackOffTime(4,1));
+        assertEquals(25000, backOff.calculateBackOffTime(5,5));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
----------------------------------------------------------------------
diff --git a/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
b/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
new file mode 100644
index 0000000..903cc69
--- /dev/null
+++ b/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
@@ -0,0 +1,176 @@
+package org.apache.streams.util.oauth.tokens.tokenmanager;
+
+
+import org.apache.streams.util.oauth.tokens.OauthToken;
+import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit tests for BasticTokenManager
+ */
+public class TestBasicTokenManager {
+
+    /**
+     * Simple token for testing purposes
+     */
+    private class TestToken extends OauthToken {
+
+        private String s;
+
+        public TestToken(String s) {
+            this.s = s;
+        }
+
+        @Override
+        protected boolean internalEquals(Object o) {
+            if(!(o instanceof TestToken))
+                return false;
+            TestToken that = (TestToken) o;
+            return this.s.equals(that.s);
+        }
+    }
+
+    @Test
+    public void testNoArgConstructor() {
+        try {
+            BasicTokenManger manager = new BasicTokenManger<TestToken>();
+            assertEquals(0, manager.numAvailableTokens());
+        } catch (Throwable t) {
+            fail("Constructors threw error: "+t.getMessage());
+        }
+    }
+
+    @Test
+    public void testCollectionConstructor() {
+        List<TestToken> tokens = new LinkedList<TestToken>();
+        try {
+            BasicTokenManger manager1 = new BasicTokenManger<TestToken>(tokens);
+            tokens.add(new TestToken("a"));
+            tokens.add(new TestToken("b"));
+            assertEquals(0, manager1.numAvailableTokens());
+            BasicTokenManger manager2 = new BasicTokenManger<TestToken>(tokens);
+            assertEquals(2, manager2.numAvailableTokens());
+            assertEquals(0, manager1.numAvailableTokens());
+        } catch (Throwable t) {
+            fail("Constructors threw error: "+t.getMessage());
+        }
+    }
+
+    @Test
+    public void testAddTokenToPool() {
+        BasicTokenManger<TestToken> manager = new BasicTokenManger<TestToken>();
+        assertTrue(manager.addTokenToPool(new TestToken("a")));
+        assertEquals(1, manager.numAvailableTokens());
+        assertFalse(manager.addTokenToPool(new TestToken("a")));
+        assertEquals(1, manager.numAvailableTokens());
+        assertTrue(manager.addTokenToPool(new TestToken("b")));
+        assertEquals(2, manager.numAvailableTokens());
+    }
+
+    @Test
+    public void testAddAllTokensToPool() {
+        BasicTokenManger<TestToken> manager = new BasicTokenManger<TestToken>();
+        List<TestToken> tokens = new ArrayList<TestToken>();
+        tokens.add(new TestToken("a"));
+        tokens.add(new TestToken("b"));
+        tokens.add(new TestToken("c"));
+        assertTrue(manager.addAllTokensToPool(tokens));
+        assertEquals(3, manager.numAvailableTokens());
+        assertFalse(manager.addAllTokensToPool(tokens));
+        assertEquals(3, manager.numAvailableTokens());
+        tokens.add(new TestToken("d"));
+        assertTrue(manager.addAllTokensToPool(tokens));
+        assertEquals(4, manager.numAvailableTokens());
+    }
+
+    @Test
+    public void testGetNextAvailableToken() {
+        BasicTokenManger manager = new BasicTokenManger<TestToken>();
+        assertNull(manager.getNextAvailableToken());
+        TestToken tokenA = new TestToken("a");
+        assertTrue(manager.addTokenToPool(tokenA));
+        assertEquals(tokenA, manager.getNextAvailableToken());
+        assertEquals(tokenA, manager.getNextAvailableToken());
+        assertEquals(tokenA, manager.getNextAvailableToken());
+
+        TestToken tokenB = new TestToken("b");
+        TestToken tokenC = new TestToken("c");
+        assertTrue(manager.addTokenToPool(tokenB));
+        assertTrue(manager.addTokenToPool(tokenC));
+        assertEquals(tokenA, manager.getNextAvailableToken());
+        assertEquals(tokenB, manager.getNextAvailableToken());
+        assertEquals(tokenC, manager.getNextAvailableToken());
+        assertEquals(tokenA, manager.getNextAvailableToken());
+        assertEquals(tokenB, manager.getNextAvailableToken());
+        assertEquals(tokenC, manager.getNextAvailableToken());
+    }
+
+    @Test
+    public void testMultiThreadSafety() {
+        int numThreads = 10;
+        ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+        CountDownLatch startLatch = new CountDownLatch(1);
+        CountDownLatch finishLatch = new CountDownLatch(numThreads);
+        BasicTokenManger<TestToken> manager = new BasicTokenManger<TestToken>();
+        for(int i=0; i < numThreads; ++i) {
+            assertTrue(manager.addTokenToPool(new TestToken(String.valueOf(i))));
+        }
+        for(int i=0; i < numThreads; ++i) {
+            executor.submit(new TestThread(manager, startLatch, finishLatch, numThreads));
+        }
+        try {
+            Thread.sleep(2000); //sleep for 2 seconds so other threads can initialize
+            startLatch.countDown();
+            finishLatch.await();
+            assertTrue("No errors were thrown during thead safe check", true);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        } catch (Throwable t) {
+            fail("Error occured durring thread safe test : "+t.getMessage());
+        }
+    }
+
+    /**
+     * Test class for thread safe check.
+     */
+    private class TestThread implements Runnable {
+
+        private BasicTokenManger<TestToken> manager;
+        private CountDownLatch startLatch;
+        private CountDownLatch finishedLatch;
+        private int availableTokens;
+
+        public TestThread(BasicTokenManger<TestToken> manager, CountDownLatch startLatch,
CountDownLatch finishedLatch, int availableTokens) {
+            this.manager = manager;
+            this.startLatch = startLatch;
+            this.finishedLatch = finishedLatch;
+            this.availableTokens = availableTokens;
+        }
+
+        @Override
+        public void run() {
+            try {
+                this.startLatch.await();
+                for(int i=0; i < 1000; ++i) {
+                    assertNotNull(this.manager.getNextAvailableToken());
+                    assertEquals(this.availableTokens, this.manager.numAvailableTokens());
+                }
+                this.finishedLatch.countDown();
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            } catch (Throwable t) {
+                fail("Threw error in multithread test : "+t.getMessage());
+            }
+        }
+    }
+
+}


Mime
View raw message