streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From smar...@apache.org
Subject [4/7] incubator-streams git commit: STREAMS-441: Remove compile dependency on guava for core packages, this closes apache/incubator-streams#320
Date Sat, 26 Nov 2016 18:08:05 GMT
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
index be59bd7..a665023 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
@@ -36,7 +36,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.typesafe.config.ConfigRenderOptions;
-
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,7 +85,7 @@ public abstract class FacebookProvider implements StreamsProvider {
    * FacebookProvider constructor - uses supplied FacebookConfiguration.
    */
   public FacebookProvider(FacebookConfiguration configuration) {
-    this.configuration = (FacebookConfiguration) SerializationUtil.cloneBySerialization(configuration);
+    this.configuration = SerializationUtil.cloneBySerialization(configuration);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java
index 3939f23..4c14f75 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java
@@ -31,7 +31,14 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigRenderOptions;
-
+import facebook4j.Facebook;
+import facebook4j.FacebookException;
+import facebook4j.FacebookFactory;
+import facebook4j.Friend;
+import facebook4j.Paging;
+import facebook4j.ResponseList;
+import facebook4j.User;
+import facebook4j.conf.ConfigurationBuilder;
 import org.apache.commons.lang.NotImplementedException;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -51,15 +58,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import facebook4j.Facebook;
-import facebook4j.FacebookException;
-import facebook4j.FacebookFactory;
-import facebook4j.Friend;
-import facebook4j.Paging;
-import facebook4j.ResponseList;
-import facebook4j.User;
-import facebook4j.conf.ConfigurationBuilder;
-
 
 public class FacebookUserInformationProvider implements StreamsProvider, Serializable {
 
@@ -73,7 +71,7 @@ public class FacebookUserInformationProvider implements StreamsProvider, Seriali
   private FacebookUserInformationConfiguration facebookUserInformationConfiguration;
 
   private Class klass;
-  protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+  protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<>();
 
   public FacebookUserInformationConfiguration getConfig() {
     return facebookUserInformationConfiguration;
@@ -95,7 +93,7 @@ public class FacebookUserInformationProvider implements StreamsProvider, Seriali
   private static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) {
     return new ThreadPoolExecutor(numThreads, numThreads,
         5000L, TimeUnit.MILLISECONDS,
-        new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+        new ArrayBlockingQueue<>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
   }
 
   /**
@@ -108,7 +106,6 @@ public class FacebookUserInformationProvider implements StreamsProvider, Seriali
       facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
     } catch (IOException ex) {
       ex.printStackTrace();
-      return;
     }
   }
 
@@ -166,9 +163,7 @@ public class FacebookUserInformationProvider implements StreamsProvider, Seriali
       providerQueue.add(
           new StreamsDatum(json, DateTime.now())
       );
-    } catch (JsonProcessingException ex) {
-      ex.printStackTrace();
-    } catch (FacebookException ex) {
+    } catch (JsonProcessingException | FacebookException ex) {
       ex.printStackTrace();
     }
 
@@ -247,8 +242,7 @@ public class FacebookUserInformationProvider implements StreamsProvider, Seriali
     this.start = start;
     this.end = end;
     readCurrent();
-    StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
-    return result;
+    return (StreamsResultSet)providerQueue.iterator();
   }
 
   @Override
@@ -287,8 +281,8 @@ public class FacebookUserInformationProvider implements StreamsProvider, Seriali
     Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getUserAccessToken());
     Preconditions.checkNotNull(facebookUserInformationConfiguration.getInfo());
 
-    List<String> ids = new ArrayList<String>();
-    List<String[]> idsBatches = new ArrayList<String[]>();
+    List<String> ids = new ArrayList<>();
+    List<String[]> idsBatches = new ArrayList<>();
 
     for (String s : facebookUserInformationConfiguration.getInfo()) {
       if (s != null) {
@@ -298,7 +292,7 @@ public class FacebookUserInformationProvider implements StreamsProvider, Seriali
           // add the batch
           idsBatches.add(ids.toArray(new String[ids.size()]));
           // reset the Ids
-          ids = new ArrayList<String>();
+          ids = new ArrayList<>();
         }
 
       }
@@ -322,9 +316,8 @@ public class FacebookUserInformationProvider implements StreamsProvider, Seriali
         .setClientVersion("v1.0");
 
     FacebookFactory ff = new FacebookFactory(cb.build());
-    Facebook facebook = ff.getInstance();
 
-    return facebook;
+    return ff.getInstance();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
index b292d30..d198e43 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
@@ -29,14 +29,19 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.util.ComponentUtils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigRenderOptions;
-
+import facebook4j.Facebook;
+import facebook4j.FacebookException;
+import facebook4j.FacebookFactory;
+import facebook4j.Post;
+import facebook4j.ResponseList;
+import facebook4j.conf.ConfigurationBuilder;
+import facebook4j.json.DataObjectFactory;
 import org.apache.commons.lang.NotImplementedException;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -47,6 +52,7 @@ import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -58,17 +64,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import facebook4j.Facebook;
-import facebook4j.FacebookException;
-import facebook4j.FacebookFactory;
-import facebook4j.Post;
-import facebook4j.ResponseList;
-import facebook4j.conf.ConfigurationBuilder;
-import facebook4j.json.DataObjectFactory;
-
 public class FacebookUserstreamProvider implements StreamsProvider, Serializable {
 
-  public static final String STREAMS_ID = "FacebookUserstreamProvider";
+  private static final String STREAMS_ID = "FacebookUserstreamProvider";
 
   private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserstreamProvider.class);
 
@@ -80,7 +78,7 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
   private Class klass;
   protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-  protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+  protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<>();
 
   public FacebookUserstreamConfiguration getConfig() {
     return configuration;
@@ -105,7 +103,7 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
   private static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) {
     return new ThreadPoolExecutor(numThreads, numThreads,
         5000L, TimeUnit.MILLISECONDS,
-        new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+        new ArrayBlockingQueue<>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
   }
 
   /**
@@ -118,7 +116,6 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
       facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
     } catch (IOException ex) {
       ex.printStackTrace();
-      return;
     }
   }
 
@@ -214,8 +211,7 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
     this.start = start;
     this.end = end;
     readCurrent();
-    StreamsResultSet result = (StreamsResultSet) providerQueue.iterator();
-    return result;
+    return (StreamsResultSet) providerQueue.iterator();
   }
 
   @Override
@@ -247,11 +243,11 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
 
     executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
 
-    Preconditions.checkNotNull(providerQueue);
-    Preconditions.checkNotNull(this.klass);
-    Preconditions.checkNotNull(configuration.getOauth().getAppId());
-    Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
-    Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
+    Objects.requireNonNull(providerQueue);
+    Objects.requireNonNull(this.klass);
+    Objects.requireNonNull(configuration.getOauth().getAppId());
+    Objects.requireNonNull(configuration.getOauth().getAppSecret());
+    Objects.requireNonNull(configuration.getOauth().getUserAccessToken());
 
     client = getFacebookClient();
 
@@ -259,8 +255,8 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
          &&
          configuration.getInfo().size() > 0 ) {
 
-      List<String> ids = new ArrayList<String>();
-      List<String[]> idsBatches = new ArrayList<String[]>();
+      List<String> ids = new ArrayList<>();
+      List<String[]> idsBatches = new ArrayList<>();
 
       for (String s : configuration.getInfo()) {
         if (s != null) {
@@ -270,7 +266,7 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
             // add the batch
             idsBatches.add(ids.toArray(new String[ids.size()]));
             // reset the Ids
-            ids = new ArrayList<String>();
+            ids = new ArrayList<>();
           }
 
         }
@@ -288,9 +284,8 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
         .setJSONStoreEnabled(true);
 
     FacebookFactory ff = new FacebookFactory(cb.build());
-    Facebook facebook = ff.getInstance();
 
-    return facebook;
+    return ff.getInstance();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
index 68d8f06..5f697f7 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
@@ -26,16 +26,14 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
-
+import facebook4j.FacebookException;
+import facebook4j.Page;
+import facebook4j.json.DataObjectFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.BlockingQueue;
 
-import facebook4j.FacebookException;
-import facebook4j.Page;
-import facebook4j.json.DataObjectFactory;
-
 /**
  * Collects the page data from public Facebook pages.
  */
@@ -64,8 +62,7 @@ public class FacebookPageDataCollector extends FacebookDataCollector {
     while (attempt < MAX_ATTEMPTS) {
       ++attempt;
       try {
-        Page page = getNextFacebookClient().getPage(pageId);
-        return page;
+        return getNextFacebookClient().getPage(pageId);
       } catch (FacebookException fe) {
         LOGGER.error("Facebook returned an exception : {}", fe);
         LOGGER.error("Facebook returned an exception while trying to get feed for page, {} : {}", pageId, fe.getMessage());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
index e7bbcfa..ff57839 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
@@ -36,7 +36,6 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,7 +43,6 @@ import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
-import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -122,9 +120,7 @@ public class FacebookPageProvider extends FacebookProvider {
     provider.startStream();
     do {
       Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
-      Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
-      while (iterator.hasNext()) {
-        StreamsDatum datum = iterator.next();
+      for (StreamsDatum datum : provider.readCurrent()) {
         String json;
         try {
           json = MAPPER.writeValueAsString(datum.getDocument());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java
index f509170..5cf8b3e 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java
@@ -25,18 +25,16 @@ import org.apache.streams.facebook.provider.FacebookDataCollector;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.BlockingQueue;
-
 import facebook4j.FacebookException;
 import facebook4j.Paging;
 import facebook4j.Post;
 import facebook4j.Reading;
 import facebook4j.ResponseList;
 import facebook4j.json.DataObjectFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
 
 /**
  * Collects the page feed data from public Facebook pages.

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
index 5d977e0..3e7d8d9 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
@@ -35,7 +35,6 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +42,6 @@ import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
-import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -98,9 +96,7 @@ public class FacebookPageFeedProvider extends FacebookProvider {
     provider.startStream();
     do {
       Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
-      Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
-      while (iterator.hasNext()) {
-        StreamsDatum datum = iterator.next();
+      for (StreamsDatum datum : provider.readCurrent()) {
         String json;
         try {
           json = MAPPER.writeValueAsString(datum.getDocument());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/serializer/FacebookActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/serializer/FacebookActivityUtil.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/serializer/FacebookActivityUtil.java
index 93ac199..a81719c 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/serializer/FacebookActivityUtil.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/serializer/FacebookActivityUtil.java
@@ -22,6 +22,7 @@ import org.apache.streams.data.util.ActivityUtil;
 import org.apache.streams.exceptions.ActivitySerializerException;
 import org.apache.streams.facebook.Cover;
 import org.apache.streams.facebook.Datum;
+import org.apache.streams.facebook.Like;
 import org.apache.streams.facebook.Location;
 import org.apache.streams.facebook.Page;
 import org.apache.streams.facebook.Place;
@@ -37,9 +38,7 @@ import org.apache.streams.pojo.json.Provider;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +46,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * FacebookActivityUtil helps convert facebook data to activity formats.
@@ -93,9 +93,9 @@ public class FacebookActivityUtil {
   }
 
   /**
-   * Builds out the {@link org.apache.streams.pojo.json.ActivityObject} from the given {@link Post}.
+   * Builds out the {@link ActivityObject} from the given {@link Post}.
    * @param post
-   * @return {@link org.apache.streams.pojo.json.ActivityObject}
+   * @return {@link ActivityObject}
    */
   public static ActivityObject buildObject(Post post) {
     ActivityObject activityObject = new ActivityObject();
@@ -122,7 +122,7 @@ public class FacebookActivityUtil {
   }
 
   /**
-   * Gets the common facebook {@link org.apache.streams.pojo.json.Provider} object.
+   * Gets the common facebook {@link Provider} object.
    * @return a provider object representing Facebook
    */
   public static Provider getProvider() {
@@ -134,17 +134,15 @@ public class FacebookActivityUtil {
   }
 
   /**
-   * Builds the activity {@link org.apache.streams.pojo.json.ActivityObject} actor from the Page.
+   * Builds the activity {@link ActivityObject} actor from the Page.
    * @param page the object to use as the source
    * @return a valid Actor populated from the Page
    */
   public static ActivityObject buildActor(Page page) {
     ActivityObject actor = new ActivityObject();
     actor.setId(formatId(
-        Optional.fromNullable(
-            page.getId())
-            .or(Optional.of(page.getId().toString()))
-            .orNull()
+        Optional.ofNullable(Optional.ofNullable(page.getId())
+            .orElseGet(Optional.of(page.getId())::get)).orElse(null)
     ));
 
     actor.setDisplayName(page.getName());
@@ -169,19 +167,17 @@ public class FacebookActivityUtil {
   }
 
   /**
-   * Builds an {@link org.apache.streams.pojo.json.ActivityObject} object from the {@link Post}.
+   * Builds an {@link ActivityObject} object from the {@link Post}.
    * @param post post
-   * @return {@link org.apache.streams.pojo.json.ActivityObject}
+   * @return {@link ActivityObject}
    */
   public static ActivityObject buildActor(Post post) {
     ActivityObject actor = new ActivityObject();
 
     try {
       actor.setId(formatId(
-          Optional.fromNullable(
-              post.getFrom().getId())
-              .or(Optional.of(post.getFrom().getId()))
-              .orNull()
+          Optional.ofNullable(Optional.ofNullable(post.getFrom().getId())
+              .orElseGet(Optional.of(post.getFrom().getId())::get)).orElse(null)
       ));
 
       actor.setDisplayName(post.getFrom().getName());
@@ -215,7 +211,7 @@ public class FacebookActivityUtil {
     }
 
     /**
-     * Fills out the extensions attribute of the passed in {@link org.apache.streams.pojo.json.Activity}
+     * Fills out the extensions attribute of the passed in {@link Activity}
      * @param activity
      * @param post
      */
@@ -226,7 +222,7 @@ public class FacebookActivityUtil {
 
         if(post.getLikes() != null && post.getLikes().size() > 0) {
             Map<String, Object> likes = new HashMap<>();
-            org.apache.streams.facebook.Like like = post.getLikes().get(0);
+            Like like = post.getLikes().get(0);
 
             if(like.getAdditionalProperties().containsKey("data")) {
                 extensions.put("likes", likes);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/SimplePageTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/SimplePageTest.java b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/SimplePageTest.java
index 36d3b16..b7009e2 100644
--- a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/SimplePageTest.java
+++ b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/SimplePageTest.java
@@ -28,7 +28,6 @@ import org.apache.streams.pojo.json.Activity;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -87,7 +86,7 @@ public class SimplePageTest {
   public void TestDeserialization() throws Exception {
     Page page = mapper.convertValue(event, Page.class);
 
-    Activity activity = null;
+    Activity activity;
     activity = facebookPageActivitySerializer.deserialize(page);
 
     assertThat(activity, is(not(nullValue())));

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/TestPage.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/TestPage.java b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/TestPage.java
index 620f31f..893509e 100644
--- a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/TestPage.java
+++ b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/TestPage.java
@@ -17,13 +17,13 @@
  */
 package org.apache.streams.facebook.test;
 
-import java.net.URL;
-import java.util.Date;
-
 import facebook4j.Cover;
 import facebook4j.Page;
 import facebook4j.Place;
 
+import java.net.URL;
+import java.util.Date;
+
 public class TestPage implements Page {
   private String id;
   private String name;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookActivityActorSerDeIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookActivityActorSerDeIT.java b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookActivityActorSerDeIT.java
index 7b50535..4133b43 100644
--- a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookActivityActorSerDeIT.java
+++ b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookActivityActorSerDeIT.java
@@ -26,7 +26,6 @@ import org.apache.streams.pojo.json.Activity;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Joiner;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.input.BoundedInputStream;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookActivitySerDeIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookActivitySerDeIT.java b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookActivitySerDeIT.java
index 8112d59..7ad9388 100644
--- a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookActivitySerDeIT.java
+++ b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookActivitySerDeIT.java
@@ -26,7 +26,6 @@ import org.apache.streams.pojo.json.Activity;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Joiner;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.input.BoundedInputStream;
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPageSerDeIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPageSerDeIT.java b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPageSerDeIT.java
index 2a2ef6e..d90ea3c 100644
--- a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPageSerDeIT.java
+++ b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPageSerDeIT.java
@@ -24,7 +24,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Joiner;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.input.BoundedInputStream;
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPostSerDeIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPostSerDeIT.java b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPostSerDeIT.java
index 3ae6749..6ed1d55 100644
--- a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPostSerDeIT.java
+++ b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPostSerDeIT.java
@@ -26,7 +26,6 @@ import org.apache.streams.pojo.json.Activity;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Joiner;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.input.BoundedInputStream;
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/page/FacebookPageProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/page/FacebookPageProviderIT.java b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/page/FacebookPageProviderIT.java
index f78f2e5..b280eae 100644
--- a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/page/FacebookPageProviderIT.java
+++ b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/page/FacebookPageProviderIT.java
@@ -21,7 +21,6 @@ package org.apache.streams.facebook.test.providers.page;
 import org.apache.streams.facebook.provider.page.FacebookPageProvider;
 
 import com.google.common.collect.Lists;
-
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +43,7 @@ public class FacebookPageProviderIT {
     args[0] = configfile;
     args[1] = outfile;
 
-    Thread testThread = new Thread((Runnable) () -> {
+    Thread testThread = new Thread(() -> {
       try {
         FacebookPageProvider.main(args);
       } catch( Exception e ) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java
index a0aac9b..168aed7 100644
--- a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java
+++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java
@@ -23,7 +23,6 @@ import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.util.ComponentUtils;
 
 import com.googlecode.gmail4j.GmailMessage;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java
index acc745d..3758a5f 100644
--- a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java
+++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java
@@ -41,7 +41,6 @@ import com.googlecode.gmail4j.javamail.JavaMailGmailMessage;
 import com.sun.mail.imap.IMAPFolder;
 import com.sun.mail.imap.IMAPMessage;
 import com.sun.mail.imap.IMAPSSLStore;
-
 import org.apache.commons.lang.NotImplementedException;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -49,7 +48,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
-
 import javax.mail.internet.MimeMultipart;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
index e11628f..bd21acc 100644
--- a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
+++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
@@ -25,7 +25,6 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 
-import com.google.common.base.Preconditions;
 import com.google.gmail.GMailConfiguration;
 import com.googlecode.gmail4j.GmailClient;
 import com.googlecode.gmail4j.GmailConnection;
@@ -33,13 +32,13 @@ import com.googlecode.gmail4j.http.HttpGmailConnection;
 import com.googlecode.gmail4j.javamail.ImapGmailClient;
 import com.googlecode.gmail4j.javamail.ImapGmailConnection;
 import com.googlecode.gmail4j.rss.RssGmailClient;
-
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.math.BigInteger;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -89,7 +88,7 @@ public class GMailProvider implements StreamsProvider, Serializable {
   private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
     return new ThreadPoolExecutor(nThreads, nThreads,
         5000L, TimeUnit.MILLISECONDS,
-        new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+        new ArrayBlockingQueue<>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
   }
 
   public GMailProvider() {
@@ -162,10 +161,10 @@ public class GMailProvider implements StreamsProvider, Serializable {
   @Override
   public void prepare(Object configurationObject) {
 
-    Preconditions.checkNotNull(this.klass);
+    Objects.requireNonNull(this.klass);
 
-    Preconditions.checkNotNull(config.getUserName());
-    Preconditions.checkNotNull(config.getPassword());
+    Objects.requireNonNull(config.getUserName());
+    Objects.requireNonNull(config.getPassword());
 
     rssClient = new RssGmailClient();
     GmailConnection rssConnection = new HttpGmailConnection(config.getUserName(), config.getPassword().toCharArray());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java b/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
index 2da9e82..3a611f7 100644
--- a/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
+++ b/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
@@ -20,7 +20,6 @@ package com.google.gmail.test;
 
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
-
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
index fe4d5da..ae9da2a 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
@@ -30,7 +30,6 @@ import com.google.gplus.serializer.util.GPlusActivityDeserializer;
 import com.google.gplus.serializer.util.GPlusEventClassifier;
 import com.google.gplus.serializer.util.GPlusPersonDeserializer;
 import com.google.gplus.serializer.util.GooglePlusActivityUtil;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,7 +47,6 @@ public class GooglePlusTypeConverter implements StreamsProcessor {
   private StreamsJacksonMapper mapper;
   private Queue<Person> inQueue;
   private Queue<StreamsDatum> outQueue;
-  private GooglePlusActivityUtil googlePlusActivityUtil;
   private int count = 0;
 
   public GooglePlusTypeConverter() {}
@@ -82,10 +80,10 @@ public class GooglePlusTypeConverter implements StreamsProcessor {
 
       if (item instanceof Person) {
         activity = new Activity();
-        googlePlusActivityUtil.updateActivity((Person)item, activity);
+        GooglePlusActivityUtil.updateActivity((Person)item, activity);
       } else if (item instanceof com.google.api.services.plus.model.Activity) {
         activity = new Activity();
-        googlePlusActivityUtil.updateActivity((com.google.api.services.plus.model.Activity)item, activity);
+        GooglePlusActivityUtil.updateActivity((com.google.api.services.plus.model.Activity)item, activity);
       }
 
       if (activity != null) {
@@ -122,7 +120,6 @@ public class GooglePlusTypeConverter implements StreamsProcessor {
 
   @Override
   public void prepare(Object configurationObject) {
-    googlePlusActivityUtil = new GooglePlusActivityUtil();
     mapper = StreamsJacksonMapper.getInstance();
 
     SimpleModule simpleModule = new SimpleModule();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
index e08c571..d9c7dd7 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
@@ -29,20 +29,17 @@ import org.apache.streams.util.ComponentUtils;
 import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
 import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
 
-import com.google.api.client.googleapis.auth.oauth2.GoogleClientSecrets;
 import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
 import com.google.api.client.http.HttpTransport;
 import com.google.api.client.http.javanet.NetHttpTransport;
 import com.google.api.client.json.jackson2.JacksonFactory;
 import com.google.api.services.plus.Plus;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.gson.Gson;
-
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +53,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
@@ -88,11 +86,9 @@ public abstract class AbstractGPlusProvider implements StreamsProvider {
   private ListeningExecutorService executor;
 
   private BlockingQueue<StreamsDatum> datumQueue;
-  private BlockingQueue<Runnable> runnables;
   private AtomicBoolean isComplete;
   private boolean previousPullWasEmpty;
 
-  protected GoogleClientSecrets clientSecrets;
   protected GoogleCredential credential;
   protected Plus plus;
 
@@ -108,9 +104,9 @@ public abstract class AbstractGPlusProvider implements StreamsProvider {
   @Override
   public void prepare(Object configurationObject) {
 
-    Preconditions.checkNotNull(config.getOauth().getPathToP12KeyFile());
-    Preconditions.checkNotNull(config.getOauth().getAppName());
-    Preconditions.checkNotNull(config.getOauth().getServiceAccountEmailAddress());
+    Objects.requireNonNull(config.getOauth().getPathToP12KeyFile());
+    Objects.requireNonNull(config.getOauth().getAppName());
+    Objects.requireNonNull(config.getOauth().getServiceAccountEmailAddress());
 
     try {
       this.plus = createPlusClient();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
index 20f5002..eda55d2 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
@@ -22,7 +22,6 @@ import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.pojo.json.Activity;
 
 import com.google.gplus.serializer.util.GooglePlusActivityUtil;
-
 import org.apache.commons.lang.NotImplementedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java
index edbc663..351e5be 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java
@@ -22,7 +22,6 @@ import org.apache.streams.util.api.requests.backoff.BackOffException;
 import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
 
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
index 5585bfc..31d8b5c 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
@@ -23,7 +23,6 @@ import org.apache.streams.google.gplus.configuration.UserInfo;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
 
-import com.fasterxml.jackson.core.JsonGenerationException;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -35,7 +34,6 @@ import com.google.api.services.plus.Plus;
 import com.google.api.services.plus.model.Activity;
 import com.google.api.services.plus.model.ActivityFeed;
 import com.google.gplus.serializer.util.GPlusActivityDeserializer;
-
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java
index 3da3468..e0df58d 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java
@@ -30,7 +30,6 @@ import com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import com.google.api.services.plus.Plus;
 import com.google.api.services.plus.model.Person;
 import com.google.gplus.serializer.util.GPlusPersonDeserializer;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,7 +72,7 @@ public  class GPlusUserDataCollector extends GPlusDataCollector {
 
   protected void queueUserHistory() {
     try {
-      boolean tryAgain = false;
+      boolean tryAgain;
       int attempts = 0;
       com.google.api.services.plus.model.Person person = null;
       do {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusActivityDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusActivityDeserializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusActivityDeserializer.java
index 79f1815..7468de4 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusActivityDeserializer.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusActivityDeserializer.java
@@ -27,7 +27,6 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.google.api.client.util.DateTime;
 import com.google.api.client.util.Lists;
 import com.google.api.services.plus.model.Activity;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusCommentDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusCommentDeserializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusCommentDeserializer.java
index d143419..8c5ade5 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusCommentDeserializer.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusCommentDeserializer.java
@@ -30,7 +30,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.api.client.util.DateTime;
 import com.google.api.client.util.Lists;
 import com.google.api.services.plus.model.Comment;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusPersonDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusPersonDeserializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusPersonDeserializer.java
index f70335b..b7b4e24 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusPersonDeserializer.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusPersonDeserializer.java
@@ -27,10 +27,8 @@ import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.JsonDeserializer;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.IntNode;
 import com.google.api.client.util.Lists;
 import com.google.api.services.plus.model.Person;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,7 +60,7 @@ public class GPlusPersonDeserializer extends JsonDeserializer<Person> {
     Person person = new Person();
     try {
 
-      person.setCircledByCount((Integer) ((IntNode) node.get("circledByCount")).numberValue());
+      person.setCircledByCount((Integer) (node.get("circledByCount")).numberValue());
       person.setDisplayName(node.get("displayName").asText());
       person.setEtag(node.get("etag").asText());
       person.setGender(node.get("gender").asText());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GooglePlusActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GooglePlusActivityUtil.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GooglePlusActivityUtil.java
index 1293d18..c980b98 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GooglePlusActivityUtil.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GooglePlusActivityUtil.java
@@ -29,9 +29,7 @@ import org.apache.streams.pojo.json.Provider;
 import com.google.api.services.plus.model.Comment;
 import com.google.api.services.plus.model.Person;
 import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
-
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +38,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * GooglePlusActivityUtil helps convert c.g.Person and c.g.Activity into o.a.s.p.j.o.Page and o.a.s.p.j.Activity.
@@ -49,8 +48,8 @@ public class GooglePlusActivityUtil {
   private static final Logger LOGGER = LoggerFactory.getLogger(GooglePlusActivityUtil.class);
 
   /**
-   * Given a {@link com.google.api.services.plus.model.Person} object and an
-   * {@link org.apache.streams.pojo.json.Activity} object, fill out the appropriate details.
+   * Given a {@link Person} object and an
+   * {@link Activity} object, fill out the appropriate details.
    *
    * @param item Person
    * @param activity Activity
@@ -60,17 +59,14 @@ public class GooglePlusActivityUtil {
     activity.setActor(buildActor(item));
     activity.setVerb("update");
 
-    activity.setId(formatId(activity.getVerb(),
-        Optional.fromNullable(
-            item.getId())
-            .orNull()));
+    activity.setId(formatId(activity.getVerb(), Optional.ofNullable(item.getId()).orElse(null)));
 
     activity.setProvider(getProvider());
   }
 
   /**
-   * Given a {@link List} of {@link com.google.api.services.plus.model.Comment} objects and an
-   * {@link org.apache.streams.pojo.json.Activity}, update that Activity to contain all comments
+   * Given a {@link List} of {@link Comment} objects and an
+   * {@link Activity}, update that Activity to contain all comments
    *
    * @param comments input List of Comment
    * @param activity output Activity
@@ -86,7 +82,7 @@ public class GooglePlusActivityUtil {
 
   /**
    * Given a Google Plus {@link com.google.api.services.plus.model.Activity},
-   * convert that into an Activity streams formatted {@link org.apache.streams.pojo.json.Activity}
+   * convert that into an Activity streams formatted {@link Activity}
    *
    * @param gPlusActivity input c.g.a.s.p.m.Activity
    * @param activity output o.a.s.p.j.Activity
@@ -102,10 +98,7 @@ public class GooglePlusActivityUtil {
       activity.setContent(gPlusActivity.getObject().getContent());
     }
 
-    activity.setId(formatId(activity.getVerb(),
-        Optional.fromNullable(
-            gPlusActivity.getId())
-            .orNull()));
+    activity.setId(formatId(activity.getVerb(), Optional.ofNullable(gPlusActivity.getId()).orElse(null)));
 
     DateTime published = new DateTime(String.valueOf(gPlusActivity.getPublished()));
     activity.setPublished(published);
@@ -115,8 +108,8 @@ public class GooglePlusActivityUtil {
   }
 
   /**
-   * Adds a single {@link com.google.api.services.plus.model.Comment} to the Object.Attachments
-   * section of the passed in {@link org.apache.streams.pojo.json.Activity}
+   * Adds a single {@link Comment} to the Object.Attachments
+   * section of the passed in {@link Activity}
    *
    * @param activity output o.a.s.p.j.Activity
    * @param comment input c.g.a.s.p.m.Comment
@@ -139,7 +132,7 @@ public class GooglePlusActivityUtil {
       activity.setObject(new ActivityObject());
     }
     if (activity.getObject().getAttachments() == null) {
-      activity.getObject().setAttachments(new ArrayList<ActivityObject>());
+      activity.getObject().setAttachments(new ArrayList<>());
     }
 
     activity.getObject().getAttachments().add(obj);
@@ -147,7 +140,7 @@ public class GooglePlusActivityUtil {
 
   /**
    * Add in necessary extensions from the passed in {@link com.google.api.services.plus.model.Activity} to the
-   * {@link org.apache.streams.pojo.json.Activity} object
+   * {@link Activity} object
    *
    * @param activity output o.a.s.p.j.Activity
    * @param gPlusActivity input c.g.a.s.p.m.Activity
@@ -180,7 +173,7 @@ public class GooglePlusActivityUtil {
   }
 
   /**
-   * Set the {@link org.apache.streams.pojo.json.ActivityObject} field given the passed in
+   * Set the {@link ActivityObject} field given the passed in
    * {@link com.google.api.services.plus.model.Activity.PlusObject}
    *
    * @param activity output $.object as o.a.s.p.j.ActivityObject
@@ -193,7 +186,7 @@ public class GooglePlusActivityUtil {
       activityObject.setContent(plusObject.getContent());
       activityObject.setObjectType(plusObject.getObjectType());
 
-      java.util.List<ActivityObject> attachmentsList = new ArrayList<>();
+      List<ActivityObject> attachmentsList = new ArrayList<>();
       for (com.google.api.services.plus.model.Activity.PlusObject.Attachments attachments : plusObject.getAttachments()) {
         ActivityObject attach = new ActivityObject();
 
@@ -221,7 +214,7 @@ public class GooglePlusActivityUtil {
 
   /**
    * Given a {@link com.google.api.services.plus.model.Activity.Actor} object, return a fully fleshed
-   * out {@link org.apache.streams.pojo.json.ActivityObject} actor
+   * out {@link ActivityObject} actor
    *
    * @param gPlusActor input c.g.a.s.p.m.Activity.Actor
    * @return {@link ActivityObject} output $.actor as o.a.s.p.j.ActivityObject
@@ -245,7 +238,7 @@ public class GooglePlusActivityUtil {
   }
 
   /**
-   * Extract the relevant details from the passed in {@link com.google.api.services.plus.model.Person} object and build
+   * Extract the relevant details from the passed in {@link Person} object and build
    * an actor with them
    *
    * @param person Person
@@ -282,7 +275,7 @@ public class GooglePlusActivityUtil {
   }
 
   /**
-   * Gets the common googleplus {@link org.apache.streams.pojo.json.Provider} object
+   * Gets the common googleplus {@link Provider} object
    * @return a provider object representing GooglePlus
    */
   public static Provider getProvider() {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/GooglePlusCommentSerDeIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/GooglePlusCommentSerDeIT.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/GooglePlusCommentSerDeIT.java
index 28b4db8..c3e026f 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/GooglePlusCommentSerDeIT.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/GooglePlusCommentSerDeIT.java
@@ -29,7 +29,6 @@ import com.google.api.client.util.Lists;
 import com.google.api.services.plus.model.Comment;
 import com.google.gplus.serializer.util.GPlusCommentDeserializer;
 import com.google.gplus.serializer.util.GooglePlusActivityUtil;
-
 import org.apache.commons.lang.StringUtils;
 import org.junit.Before;
 import org.slf4j.Logger;
@@ -102,7 +101,7 @@ public class GooglePlusCommentSerDeIT {
 
       assertEquals(comments.size(), 3);
 
-      googlePlusActivityUtil.updateActivity(comments, activity);
+      GooglePlusActivityUtil.updateActivity(comments, activity);
       assertNotNull(activity);
       assertNotNull(activity.getObject());
       assertEquals(activity.getObject().getAttachments().size(), 3);
@@ -115,7 +114,7 @@ public class GooglePlusCommentSerDeIT {
   public void testEmptyComments() {
     Activity activity = new Activity();
 
-    googlePlusActivityUtil.updateActivity(new ArrayList<Comment>(), activity);
+    GooglePlusActivityUtil.updateActivity(new ArrayList<>(), activity);
 
     assertNull(activity.getObject());
   }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/GooglePlusPersonSerDeIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/GooglePlusPersonSerDeIT.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/GooglePlusPersonSerDeIT.java
index be54aa1..ab5f8cb 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/GooglePlusPersonSerDeIT.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/GooglePlusPersonSerDeIT.java
@@ -30,7 +30,6 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.api.services.plus.model.Person;
 import com.google.gplus.serializer.util.GPlusPersonDeserializer;
 import com.google.gplus.serializer.util.GooglePlusActivityUtil;
-
 import org.apache.commons.lang.StringUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -81,7 +80,7 @@ public class GooglePlusPersonSerDeIT {
 
           Person person = objectMapper.readValue(line, Person.class);
 
-          googlePlusActivityUtil.updateActivity(person, activity);
+          GooglePlusActivityUtil.updateActivity(person, activity);
           LOGGER.info("activity: {}", activity);
 
           assertNotNull(activity);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/processor/GooglePlusActivitySerDeIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/processor/GooglePlusActivitySerDeIT.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/processor/GooglePlusActivitySerDeIT.java
index d86001c..7b1d71d 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/processor/GooglePlusActivitySerDeIT.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/processor/GooglePlusActivitySerDeIT.java
@@ -30,7 +30,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.gplus.serializer.util.GPlusActivityDeserializer;
 import com.google.gplus.serializer.util.GooglePlusActivityUtil;
-
 import org.apache.commons.lang.StringUtils;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/processor/GooglePlusTypeConverterTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/processor/GooglePlusTypeConverterTest.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/processor/GooglePlusTypeConverterTest.java
index c27351c..ebdeac6 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/processor/GooglePlusTypeConverterTest.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/processor/GooglePlusTypeConverterTest.java
@@ -30,7 +30,6 @@ import com.google.api.services.plus.model.Person;
 import com.google.gplus.serializer.util.GPlusActivityDeserializer;
 import com.google.gplus.serializer.util.GPlusPersonDeserializer;
 import com.google.gplus.serializer.util.GooglePlusActivityUtil;
-
 import org.apache.commons.lang.StringUtils;
 import org.junit.Before;
 import org.junit.Ignore;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestAbstractGPlusProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestAbstractGPlusProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestAbstractGPlusProvider.java
index 4cae4c0..b3590c4 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestAbstractGPlusProvider.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestAbstractGPlusProvider.java
@@ -28,11 +28,10 @@ import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
 import com.carrotsearch.randomizedtesting.RandomizedTest;
 import com.carrotsearch.randomizedtesting.annotations.Repeat;
 import com.google.api.services.plus.Plus;
-import com.google.common.collect.Lists;
-
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 
@@ -50,7 +49,7 @@ public class TestAbstractGPlusProvider extends RandomizedTest {
   @Repeat(iterations = 3)
   public void testDataCollectorRunsPerUser() {
     int numUsers = randomIntBetween(1, 1000);
-    List<UserInfo> userList = Lists.newLinkedList();
+    List<UserInfo> userList = new LinkedList<>();
     for (int i = 0; i < numUsers; ++i) {
       userList.add(new UserInfo());
     }
@@ -71,14 +70,11 @@ public class TestAbstractGPlusProvider extends RandomizedTest {
       @Override
       protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) {
         final BlockingQueue<StreamsDatum> q = queue;
-        return new Runnable() {
-          @Override
-          public void run() {
-            try {
-              q.put(new StreamsDatum(null));
-            } catch (InterruptedException ie) {
-              fail("Test was interrupted");
-            }
+        return () -> {
+          try {
+            q.put(new StreamsDatum(null));
+          } catch (InterruptedException ie) {
+            fail("Test was interrupted");
           }
         };
       }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java
index 3620346..ffff3e0 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java
@@ -33,15 +33,14 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.api.services.plus.Plus;
 import com.google.api.services.plus.model.Activity;
 import com.google.api.services.plus.model.ActivityFeed;
-import com.google.common.collect.Lists;
 import com.google.gplus.serializer.util.GPlusActivityDeserializer;
-
 import org.joda.time.DateTime;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -142,12 +141,7 @@ public class TestGPlusUserActivityCollector extends RandomizedTest {
   private Plus createMockPlus(final int numBefore, final int numAfter, final int numInRange, final DateTime after, final DateTime before) {
     Plus plus = mock(Plus.class);
     final Plus.Activities activities = createMockPlusActivities(numBefore, numAfter, numInRange, after, before);
-    doAnswer(new Answer() {
-      @Override
-      public Plus.Activities answer(InvocationOnMock invocationOnMock) throws Throwable {
-        return activities;
-      }
-    }).when(plus).activities();
+    doAnswer(invocationOnMock -> activities).when(plus).activities();
     return plus;
   }
 
@@ -194,14 +188,14 @@ public class TestGPlusUserActivityCollector extends RandomizedTest {
       DateTime before,
       boolean page) {
     ActivityFeed feed = new ActivityFeed();
-    List<Activity> list = Lists.newLinkedList();
+    List<Activity> list = new LinkedList<>();
     for (int i = 0; i < numAfter; ++i) {
       DateTime published = before.plus(randomIntBetween(0, Integer.MAX_VALUE));
       Activity activity = createActivityWithPublishedDate(published);
       list.add(activity);
     }
     for (int i = 0; i < numInRange; ++i) {
-      DateTime published = null;
+      DateTime published;
       if ((before == null && after == null) || before == null) {
         published = DateTime.now(); // no date range or end time date range so just make the time now.
       } else if (after == null) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java
index 4460fb1..8d30314 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java
@@ -27,7 +27,6 @@ import org.apache.streams.util.api.requests.backoff.impl.ConstantTimeBackOffStra
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import com.google.api.services.plus.Plus;
 import com.google.api.services.plus.model.Person;
-
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -95,24 +94,14 @@ public class TestGPlusUserDataCollector {
 
   private Plus createMockPlus(final int succedOnTry, final Throwable throwable) {
     Plus plus = mock(Plus.class);
-    doAnswer(new Answer() {
-      @Override
-      public Plus.People answer(InvocationOnMock invocationOnMock) throws Throwable {
-        return createMockPeople(succedOnTry, throwable);
-      }
-    }).when(plus).people();
+    doAnswer(invocationOnMock -> createMockPeople(succedOnTry, throwable)).when(plus).people();
     return plus;
   }
 
   private Plus.People createMockPeople(final int succedOnTry, final Throwable throwable) {
     Plus.People people = mock(Plus.People.class);
     try {
-      when(people.get(anyString())).thenAnswer(new Answer<Plus.People.Get>() {
-        @Override
-        public Plus.People.Get answer(InvocationOnMock invocationOnMock) throws Throwable {
-          return createMockGetNoError(succedOnTry, throwable);
-        }
-      });
+      when(people.get(anyString())).thenAnswer(invocationOnMock -> createMockGetNoError(succedOnTry, throwable));
     } catch (IOException ioe) {
       fail("No Excpetion should have been thrown while creating mocks");
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
index 025af18..1e2a714 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
@@ -42,12 +42,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramMediaFeedDataConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramMediaFeedDataConverter.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramMediaFeedDataConverter.java
index c9bbd85..40c3b37 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramMediaFeedDataConverter.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramMediaFeedDataConverter.java
@@ -22,11 +22,11 @@ import org.apache.streams.data.ActivityConverter;
 import org.apache.streams.exceptions.ActivityConversionException;
 import org.apache.streams.pojo.json.Activity;
 
-import com.google.common.collect.Lists;
 import org.apache.commons.lang.NotImplementedException;
 import org.jinstagram.entity.users.feed.MediaFeedData;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.List;
 
 import static org.apache.streams.instagram.serializer.util.InstagramActivityUtil.updateActivity;
@@ -66,7 +66,7 @@ public class InstagramMediaFeedDataConverter implements ActivityConverter<MediaF
 
     updateActivity(item, activity);
 
-    return Lists.newArrayList(activity);
+    return Collections.singletonList(activity);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
index dd181de..49e3c01 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
@@ -28,8 +28,6 @@ import org.apache.streams.pojo.json.Image;
 import org.apache.streams.pojo.json.Provider;
 
 import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
 import org.jinstagram.entity.comments.CommentData;
 import org.jinstagram.entity.common.Comments;
 import org.jinstagram.entity.common.ImageData;
@@ -44,9 +42,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Provides utilities for working with Activity objects within the context of Instagram.
@@ -70,9 +70,7 @@ public class InstagramActivityUtil {
     }
 
     activity.setId(formatId(activity.getVerb(),
-        Optional.fromNullable(
-            item.getId())
-            .orNull()));
+        Optional.ofNullable(item.getId()).orElse(null)));
 
     activity.setProvider(getProvider());
     activity.setUrl(item.getLink());
@@ -298,7 +296,7 @@ public class InstagramActivityUtil {
   }
 
   /**
-   * Gets the common instagram {@link org.apache.streams.pojo.json.Provider} object.
+   * Gets the common instagram {@link Provider} object.
    * @return a provider object representing Instagram
    */
   public static Provider getProvider() {
@@ -314,7 +312,7 @@ public class InstagramActivityUtil {
    * @return a valid Activity ID in format "id:instagram:part1:part2:...partN"
    */
   public static String formatId(String... idparts) {
-    return Joiner.on(":").join(Lists.asList("id:instagram", idparts));
+    return Joiner.on(":").join(Arrays.asList("id:instagram", idparts));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
index 2ab6ee4..f7b9d88 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
@@ -30,7 +30,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
@@ -45,8 +44,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.math.BigInteger;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -63,7 +62,7 @@ public class MoreoverProvider implements StreamsProvider {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(MoreoverProvider.class);
 
-  protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+  protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>();
 
   private List<MoreoverKeyData> keys;
 
@@ -77,7 +76,7 @@ public class MoreoverProvider implements StreamsProvider {
    */
   public MoreoverProvider(MoreoverConfiguration moreoverConfiguration) {
     this.config = moreoverConfiguration;
-    this.keys = Lists.newArrayList();
+    this.keys = new ArrayList<>();
     for ( MoreoverKeyData apiKey : config.getApiKeys()) {
       this.keys.add(apiKey);
     }
@@ -104,7 +103,7 @@ public class MoreoverProvider implements StreamsProvider {
 
     LOGGER.debug("readCurrent: {}", providerQueue.size());
 
-    Collection<StreamsDatum> currentIterator = Lists.newArrayList();
+    Collection<StreamsDatum> currentIterator = new ArrayList<>();
     Iterators.addAll(currentIterator, providerQueue.iterator());
 
     StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
@@ -180,9 +179,7 @@ public class MoreoverProvider implements StreamsProvider {
     provider.startStream();
     do {
       Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
-      Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
-      while (iterator.hasNext()) {
-        StreamsDatum datum = iterator.next();
+      for (StreamsDatum datum : provider.readCurrent()) {
         String json;
         try {
           json = mapper.writeValueAsString(datum.getDocument());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java
index 589e647..c3a78bb 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java
@@ -28,7 +28,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.xml.JacksonXmlModule;
 import com.fasterxml.jackson.dataformat.xml.XmlFactory;
 import com.fasterxml.jackson.dataformat.xml.XmlMapper;
-import com.google.common.collect.Lists;
 import com.moreover.api.Article;
 import com.moreover.api.ArticlesResponse;
 import org.slf4j.Logger;
@@ -36,6 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.math.BigInteger;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
@@ -43,7 +43,6 @@ public class MoreoverResult implements Iterable<StreamsDatum> {
 
   private static final Logger logger = LoggerFactory.getLogger(MoreoverResult.class);
 
-  private ObjectMapper mapper;
   private XmlMapper xmlMapper;
 
   private String xmlString;
@@ -57,7 +56,7 @@ public class MoreoverResult implements Iterable<StreamsDatum> {
   private BigInteger maxSequencedId = BigInteger.ZERO;
 
   protected ArticlesResponse response;
-  protected List<StreamsDatum> list = Lists.newArrayList();
+  protected List<StreamsDatum> list = new ArrayList<>();
 
   protected MoreoverResult(String clientId, String xmlString, long start, long end) {
     this.xmlString = xmlString;
@@ -92,7 +91,7 @@ public class MoreoverResult implements Iterable<StreamsDatum> {
         DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
         Boolean.FALSE);
 
-    mapper = new ObjectMapper();
+    ObjectMapper mapper = new ObjectMapper();
 
     mapper
         .configure(

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java
index 2d93656..0422f7f 100644
--- a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java
+++ b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java
@@ -23,7 +23,6 @@ import org.apache.streams.moreover.MoreoverTestUtil;
 import org.apache.streams.moreover.MoreoverXmlActivitySerializer;
 import org.apache.streams.pojo.json.Activity;
 
-import com.google.common.collect.Lists;
 import org.apache.commons.io.IOUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -32,6 +31,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
 import java.nio.charset.Charset;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -49,7 +49,7 @@ public class MoreoverXmlActivitySerializerIT {
 
   @Test
   public void loadData() throws Exception {
-    List<Activity> activities = serializer.deserializeAll(Lists.newArrayList(xml));
+    List<Activity> activities = serializer.deserializeAll(Collections.singletonList(xml));
     for (Activity activity : activities) {
       MoreoverTestUtil.validate(activity);
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
index d3c763a..5bc4cf2 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
@@ -24,13 +24,13 @@ import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.rss.serializer.SyndEntryActivitySerializer;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
 import org.apache.commons.lang.NotImplementedException;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.LinkedList;
 import java.util.List;
 
 /**
@@ -53,7 +53,7 @@ public class RssTypeConverter implements StreamsProcessor {
 
   @Override
   public List<StreamsDatum> process(StreamsDatum datum) {
-    List<StreamsDatum> datums = Lists.newLinkedList();
+    List<StreamsDatum> datums = new LinkedList<>();
     if (datum.getDocument() instanceof ObjectNode) {
       Activity activity = this.serializer.deserialize((ObjectNode) datum.getDocument());
       datums.add(new StreamsDatum(activity, activity.getId(), DateTime.now().withZone(DateTimeZone.UTC)));



Mime
View raw message