streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From smar...@apache.org
Subject [8/9] incubator-streams git commit: STREAMS-463: Move every class in all repos underneath org.apache.streams, this closes apache/incubator-streams#356
Date Wed, 08 Feb 2017 04:32:54 GMT
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
deleted file mode 100644
index d9c7dd7..0000000
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.provider;
-
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.google.gplus.GPlusConfiguration;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.util.ComponentUtils;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
-
-import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.http.javanet.NetHttpTransport;
-import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.services.plus.Plus;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.gson.Gson;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.math.BigInteger;
-import java.security.GeneralSecurityException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Provider that creates a GPlus client and will run task that queue data to an outing queue.
- */
-public abstract class AbstractGPlusProvider implements StreamsProvider {
-
-  public static final String STREAMS_ID = "AbstractGPlusProvider";
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractGPlusProvider.class);
-  private static final Set<String> SCOPE = new HashSet<String>() {
-    {
-      add("https://www.googleapis.com/auth/plus.login");
-    }
-  };
-  private static final int MAX_BATCH_SIZE = 1000;
-
-  private static final HttpTransport TRANSPORT = new NetHttpTransport();
-  private static final JacksonFactory JSON_FACTORY = new JacksonFactory();
-  private static final Gson GSON = new Gson();
-
-  private GPlusConfiguration config;
-
-  List<ListenableFuture<Object>> futures = new ArrayList<>();
-
-  private ListeningExecutorService executor;
-
-  private BlockingQueue<StreamsDatum> datumQueue;
-  private AtomicBoolean isComplete;
-  private boolean previousPullWasEmpty;
-
-  protected GoogleCredential credential;
-  protected Plus plus;
-
-  public AbstractGPlusProvider() {
-    this.config = new ComponentConfigurator<>(GPlusConfiguration.class)
-        .detectConfiguration(StreamsConfigurator.getConfig().getConfig("gplus"));
-  }
-
-  public AbstractGPlusProvider(GPlusConfiguration config) {
-    this.config = config;
-  }
-
-  @Override
-  public void prepare(Object configurationObject) {
-
-    Objects.requireNonNull(config.getOauth().getPathToP12KeyFile());
-    Objects.requireNonNull(config.getOauth().getAppName());
-    Objects.requireNonNull(config.getOauth().getServiceAccountEmailAddress());
-
-    try {
-      this.plus = createPlusClient();
-    } catch (IOException | GeneralSecurityException ex) {
-      LOGGER.error("Failed to created oauth for GPlus : {}", ex);
-      throw new RuntimeException(ex);
-    }
-    // GPlus rate limits you to 5 calls per second, so there is not a need to execute more than one
-    // collector unless you have multiple oauth tokens
-    //TODO make this configurable based on the number of oauth tokens
-    this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
-    this.datumQueue = new LinkedBlockingQueue<>(1000);
-    this.isComplete = new AtomicBoolean(false);
-    this.previousPullWasEmpty = false;
-  }
-
-  @Override
-  public void startStream() {
-
-    BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2);
-    for (UserInfo user : this.config.getGooglePlusUsers()) {
-      if (this.config.getDefaultAfterDate() != null && user.getAfterDate() == null) {
-        user.setAfterDate(this.config.getDefaultAfterDate());
-      }
-      if (this.config.getDefaultBeforeDate() != null && user.getBeforeDate() == null) {
-        user.setBeforeDate(this.config.getDefaultBeforeDate());
-      }
-      this.executor.submit(getDataCollector(backOffStrategy, this.datumQueue, this.plus, user));
-    }
-    this.executor.shutdown();
-  }
-
-  protected abstract Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo);
-
-  @Override
-  public String getId() {
-    return STREAMS_ID;
-  }
-
-  @Override
-  public StreamsResultSet readCurrent() {
-    BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>();
-    int batchCount = 0;
-    while (!this.datumQueue.isEmpty() && batchCount < MAX_BATCH_SIZE) {
-      StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.datumQueue);
-      if (datum != null) {
-        ++batchCount;
-        ComponentUtils.offerUntilSuccess(datum, batch);
-      }
-    }
-    boolean pullIsEmpty = batch.isEmpty() && this.datumQueue.isEmpty() && this.executor.isTerminated();
-    this.isComplete.set(this.previousPullWasEmpty && pullIsEmpty);
-    this.previousPullWasEmpty = pullIsEmpty;
-    return new StreamsResultSet(batch);
-  }
-
-  @Override
-  public StreamsResultSet readNew(BigInteger sequence) {
-    return null;
-  }
-
-  @Override
-  public StreamsResultSet readRange(DateTime start, DateTime end) {
-    return null;
-  }
-
-  @VisibleForTesting
-  protected Plus createPlusClient() throws IOException, GeneralSecurityException {
-    credential = new GoogleCredential.Builder()
-        .setJsonFactory(JSON_FACTORY)
-        .setTransport(TRANSPORT)
-        .setServiceAccountScopes(SCOPE)
-        .setServiceAccountId(this.config.getOauth().getServiceAccountEmailAddress())
-        .setServiceAccountPrivateKeyFromP12File(new File(this.config.getOauth().getPathToP12KeyFile()))
-        .build();
-    return new Plus.Builder(TRANSPORT,JSON_FACTORY, credential).setApplicationName(this.config.getOauth().getAppName()).build();
-  }
-
-  @Override
-  public void cleanUp() {
-    ComponentUtils.shutdownExecutor(this.executor, 10, 10);
-    this.executor = null;
-  }
-
-  public GPlusConfiguration getConfig() {
-    return config;
-  }
-
-  public void setConfig(GPlusConfiguration config) {
-    this.config = config;
-  }
-
-  /**
-   * Set and overwrite the default before date that was read from the configuration file.
-   * @param defaultBeforeDate defaultBeforeDate
-   */
-  public void setDefaultBeforeDate(DateTime defaultBeforeDate) {
-    this.config.setDefaultBeforeDate(defaultBeforeDate);
-  }
-
-  /**
-   * Set and overwrite the default after date that was read from teh configuration file.
-   * @param defaultAfterDate defaultAfterDate
-   */
-  public void setDefaultAfterDate(DateTime defaultAfterDate) {
-    this.config.setDefaultAfterDate(defaultAfterDate);
-  }
-
-  /**
-   * Sets and overwrite the user info from the configuaration file.  Uses the defaults before and after dates.
-   * @param userIds userIds
-   */
-  public void setUserInfoWithDefaultDates(Set<String> userIds) {
-    List<UserInfo> gplusUsers = new LinkedList<>();
-    for (String userId : userIds) {
-      UserInfo user = new UserInfo();
-      user.setUserId(userId);
-      user.setAfterDate(this.config.getDefaultAfterDate());
-      user.setBeforeDate(this.config.getDefaultBeforeDate());
-      gplusUsers.add(user);
-    }
-    this.config.setGooglePlusUsers(gplusUsers);
-  }
-
-  /**
-   * Set and overwrite user into from the configuration file. Only sets after date.
-   * @param usersAndAfterDates usersAndAfterDates
-   */
-  public void setUserInfoWithAfterDate(Map<String, DateTime> usersAndAfterDates) {
-    List<UserInfo> gplusUsers = new LinkedList<>();
-    for (String userId : usersAndAfterDates.keySet()) {
-      UserInfo user = new UserInfo();
-      user.setUserId(userId);
-      user.setAfterDate(usersAndAfterDates.get(userId));
-      gplusUsers.add(user);
-    }
-    this.config.setGooglePlusUsers(gplusUsers);
-  }
-
-  @Override
-  public boolean isRunning() {
-    if (datumQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
-      LOGGER.info("Completed");
-      isComplete.set(true);
-      LOGGER.info("Exiting");
-    }
-    return !isComplete.get();
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
deleted file mode 100644
index eda55d2..0000000
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.provider;
-
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.pojo.json.Activity;
-
-import com.google.gplus.serializer.util.GooglePlusActivityUtil;
-import org.apache.commons.lang.NotImplementedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * GPlusActivitySerializer converts gplus activities to as1 activities.
- */
-public class GPlusActivitySerializer implements ActivitySerializer<com.google.api.services.plus.model.Activity> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(GPlusActivitySerializer.class);
-
-  AbstractGPlusProvider provider;
-
-  public GPlusActivitySerializer(AbstractGPlusProvider provider) {
-
-    this.provider = provider;
-  }
-
-  public GPlusActivitySerializer() {
-  }
-
-  @Override
-  public String serializationFormat() {
-    return "gplus.v1";
-  }
-
-  @Override
-  public com.google.api.services.plus.model.Activity serialize(Activity deserialized) {
-    throw new NotImplementedException("Not currently implemented");
-  }
-
-  @Override
-  public Activity deserialize(com.google.api.services.plus.model.Activity gplusActivity) {
-    Activity activity = new Activity();
-
-    GooglePlusActivityUtil.updateActivity(gplusActivity, activity);
-    return activity;
-  }
-
-  @Override
-  public List<Activity> deserializeAll(List<com.google.api.services.plus.model.Activity> serializedList) {
-    throw new NotImplementedException("Not currently implemented");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java
deleted file mode 100644
index 351e5be..0000000
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.provider;
-
-import org.apache.streams.util.api.requests.backoff.BackOffException;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * GPlusDataCollector collects GPlus Data on behalf of providers.
- */
-public abstract class GPlusDataCollector implements Runnable {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(GPlusDataCollector.class);
-
-  /**
-   * Looks at the status code of the exception.  If the code indicates that the request should be retried,
-   * it executes the back off strategy and returns true.
-   * @param gjre GoogleJsonResponseException
-   * @param backOff BackOffStrategy
-   * @return returns true if the error code of the exception indicates the request should be retried.
-   */
-  public boolean backoffAndIdentifyIfRetry(GoogleJsonResponseException gjre, BackOffStrategy backOff) throws BackOffException {
-    boolean tryAgain = false;
-    switch (gjre.getStatusCode()) {
-      case 400 :
-        LOGGER.warn("Bad Request  : {}",  gjre);
-        break;
-      case 401 :
-        LOGGER.warn("Invalid Credentials : {}", gjre);
-        break;
-      case 403 :
-        LOGGER.warn("Possible rate limit exception. Retrying. : {}", gjre.getMessage());
-        backOff.backOff();
-        tryAgain = true;
-        break;
-      case 503 :
-        LOGGER.warn("Google Backend Service Error : {}", gjre);
-        break;
-      default:
-        LOGGER.warn("Google Service returned error : {}", gjre);
-        tryAgain = true;
-        backOff.backOff();
-        break;
-    }
-    return tryAgain;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
deleted file mode 100644
index 31d8b5c..0000000
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.provider;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.fasterxml.jackson.databind.ser.std.StdSerializer;
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.services.plus.Plus;
-import com.google.api.services.plus.model.Activity;
-import com.google.api.services.plus.model.ActivityFeed;
-import com.google.gplus.serializer.util.GPlusActivityDeserializer;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Collects the public activities of a GPlus user. Has ability to filter by date ranges.
- */
-public class GPlusUserActivityCollector extends GPlusDataCollector {
-
-  /**
-   * Key for all public activities
-   * https://developers.google.com/+/api/latest/activities/list
-   */
-  private static final String PUBLIC_COLLECTION = "public";
-  /**
-   * Max results allowed per request
-   * https://developers.google.com/+/api/latest/activities/list
-   */
-  private static final long MAX_RESULTS = 100;
-  private static final int MAX_ATTEMPTS = 5;
-  private static final Logger LOGGER = LoggerFactory.getLogger(GPlusUserActivityCollector.class);
-  private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
-  static { //set up mapper for Google Activity Object
-    SimpleModule simpleModule = new SimpleModule();
-    simpleModule.addDeserializer(Activity.class, new GPlusActivityDeserializer());
-    simpleModule.addSerializer(
-        com.google.api.client.util.DateTime.class,
-        new StdSerializer<com.google.api.client.util.DateTime>(com.google.api.client.util.DateTime.class) {
-          @Override
-          public void serialize(
-              com.google.api.client.util.DateTime dateTime,
-              JsonGenerator jsonGenerator,
-              SerializerProvider serializerProvider)
-              throws IOException {
-            jsonGenerator.writeString(dateTime.toStringRfc3339());
-          }
-        });
-    MAPPER.registerModule(simpleModule);
-    MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-  }
-
-  private BlockingQueue<StreamsDatum> datumQueue;
-  private BackOffStrategy backOff;
-  private Plus plus;
-  private UserInfo userInfo;
-
-  /**
-   * GPlusUserActivityCollector constructor.
-   * @param plus Plus
-   * @param datumQueue BlockingQueue<StreamsDatum>
-   * @param backOff BackOffStrategy
-   * @param userInfo UserInfo
-   */
-  public GPlusUserActivityCollector(Plus plus, BlockingQueue<StreamsDatum> datumQueue, BackOffStrategy backOff, UserInfo userInfo) {
-    this.plus = plus;
-    this.datumQueue = datumQueue;
-    this.backOff = backOff;
-    this.userInfo = userInfo;
-  }
-
-  @Override
-  public void run() {
-    collectActivityData();
-  }
-
-  protected void collectActivityData() {
-    try {
-      ActivityFeed feed = null;
-      boolean tryAgain = false;
-      int attempt = 0;
-      DateTime afterDate = userInfo.getAfterDate();
-      DateTime beforeDate = userInfo.getBeforeDate();
-      do {
-        try {
-          if (feed == null) {
-            feed = this.plus.activities().list(this.userInfo.getUserId(), PUBLIC_COLLECTION)
-                .setMaxResults(MAX_RESULTS).execute();
-          } else {
-            feed = this.plus.activities().list(this.userInfo.getUserId(), PUBLIC_COLLECTION)
-                .setMaxResults(MAX_RESULTS)
-                .setPageToken(feed.getNextPageToken()).execute();
-          }
-          this.backOff.reset(); //successful pull reset api.
-          for (com.google.api.services.plus.model.Activity activity : feed.getItems()) {
-            DateTime published = new DateTime(activity.getPublished().getValue());
-            if ((afterDate == null && beforeDate == null)
-                || (beforeDate == null && afterDate.isBefore(published))
-                || (afterDate == null && beforeDate.isAfter(published))
-                || ((afterDate != null && beforeDate != null) && (afterDate.isBefore(published) && beforeDate.isAfter(published)))) {
-              String json = MAPPER.writeValueAsString(activity);
-              this.datumQueue.put(new StreamsDatum(json, activity.getId()));
-            } else if (afterDate != null && afterDate.isAfter(published)) {
-              feed.setNextPageToken(null); // do not fetch next page
-              break;
-            }
-          }
-        } catch (GoogleJsonResponseException gjre) {
-          tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOff);
-          ++attempt;
-        }
-      }
-      while ((tryAgain || (feed != null && feed.getNextPageToken() != null)) && attempt < MAX_ATTEMPTS);
-    } catch (Throwable th) {
-      if (th instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
-      }
-      th.printStackTrace();
-      LOGGER.warn("Unable to pull Activities for user={} : {}",this.userInfo.getUserId(), th);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
deleted file mode 100644
index 97b08fd..0000000
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.provider;
-
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.GPlusConfiguration;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-
-import com.google.api.services.plus.Plus;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.google.gson.Gson;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.PrintStream;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-/**
- *  Retrieve recent activity from a list of accounts.
- *
- *  <p/>
- *  To use from command line:
- *
- *  <p/>
- *  Supply (at least) the following required configuration in application.conf:
- *
- *  <p/>
- *  gplus.oauth.pathToP12KeyFile
- *  gplus.oauth.serviceAccountEmailAddress
- *  gplus.apiKey
- *  gplus.googlePlusUsers
- *
- *  <p/>
- *  Launch using:
- *
- *  <p/>
- *  mvn exec:java -Dexec.mainClass=com.google.gplus.provider.GPlusUserActivityProvider -Dexec.args="application.conf activity.json"
- */
-public class GPlusUserActivityProvider extends AbstractGPlusProvider {
-
-  private static final String STREAMS_ID = "GPlusUserActivityProvider";
-
-  public GPlusUserActivityProvider() {
-    super();
-  }
-
-  public GPlusUserActivityProvider(GPlusConfiguration config) {
-    super(config);
-  }
-
-  @Override
-  public String getId() {
-    return STREAMS_ID;
-  }
-
-  @Override
-  protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) {
-    return new GPlusUserActivityCollector(plus, queue, strategy, userInfo);
-  }
-
-  /**
-   * Retrieve recent activity from a list of accounts.
-   * @param args args
-   * @throws Exception Exception
-   */
-  public static void main(String[] args) throws Exception {
-
-    Preconditions.checkArgument(args.length >= 2);
-
-    String configfile = args[0];
-    String outfile = args[1];
-
-    Config reference = ConfigFactory.load();
-    File file = new File(configfile);
-    assert (file.exists());
-    Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false));
-
-    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-
-    StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
-    GPlusConfiguration config = new ComponentConfigurator<>(GPlusConfiguration.class).detectConfiguration(typesafe, "gplus");
-    GPlusUserActivityProvider provider = new GPlusUserActivityProvider(config);
-
-    Gson gson = new Gson();
-
-    PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
-    provider.prepare(config);
-    provider.startStream();
-    do {
-      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
-      for (StreamsDatum datum : provider.readCurrent()) {
-        String json;
-        if (datum.getDocument() instanceof String) {
-          json = (String) datum.getDocument();
-        } else {
-          json = gson.toJson(datum.getDocument());
-        }
-        outStream.println(json);
-      }
-    }
-    while ( provider.isRunning());
-    provider.cleanUp();
-    outStream.flush();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java
deleted file mode 100644
index e0df58d..0000000
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.provider;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.services.plus.Plus;
-import com.google.api.services.plus.model.Person;
-import com.google.gplus.serializer.util.GPlusPersonDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Collects user profile information for a specific GPlus user.
- */
-public  class GPlusUserDataCollector extends GPlusDataCollector {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(GPlusUserDataCollector.class);
-  private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-  private static final int MAX_ATTEMPTS = 5;
-
-  static { //set up Mapper for Person objects
-    SimpleModule simpleModule = new SimpleModule();
-    simpleModule.addDeserializer(Person.class, new GPlusPersonDeserializer());
-    MAPPER.registerModule(simpleModule);
-    MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-  }
-
-  private BackOffStrategy backOffStrategy;
-  private Plus plus;
-  private BlockingQueue<StreamsDatum> datumQueue;
-  private UserInfo userInfo;
-
-  /**
-   * GPlusUserDataCollector constructor.
-   * @param plus Plus
-   * @param backOffStrategy BackOffStrategy
-   * @param datumQueue BlockingQueue of StreamsDatum
-   * @param userInfo UserInfo
-   */
-  public GPlusUserDataCollector(Plus plus, BackOffStrategy backOffStrategy, BlockingQueue<StreamsDatum> datumQueue, UserInfo userInfo) {
-    this.plus = plus;
-    this.backOffStrategy = backOffStrategy;
-    this.datumQueue = datumQueue;
-    this.userInfo = userInfo;
-  }
-
-  protected void queueUserHistory() {
-    try {
-      boolean tryAgain;
-      int attempts = 0;
-      com.google.api.services.plus.model.Person person = null;
-      do {
-        try {
-          person = this.plus.people().get(userInfo.getUserId()).execute();
-          this.backOffStrategy.reset();
-          tryAgain = person == null;
-        } catch (GoogleJsonResponseException gjre) {
-          tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOffStrategy);
-        }
-        ++attempts;
-      }
-      while (tryAgain && attempts < MAX_ATTEMPTS);
-      String json = MAPPER.writeValueAsString(person);
-      this.datumQueue.put(new StreamsDatum(json, person.getId()));
-    } catch (Throwable throwable) {
-      LOGGER.warn("Unable to pull user data for user={} : {}", userInfo.getUserId(), throwable);
-      if (throwable instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
-      }
-    }
-  }
-
-  @Override
-  public void run() {
-    queueUserHistory();
-  }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
deleted file mode 100644
index 28bcb55..0000000
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.provider;
-
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.GPlusConfiguration;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-
-import com.google.api.services.plus.Plus;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.google.gson.Gson;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.PrintStream;
-import java.util.Iterator;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-/**
- *  Retrieve current profile status for a list of accounts.
- *
- *  <p/>
- *  To use from command line:
- *
- *  <p/>
- *  Supply (at least) the following required configuration in application.conf:
- *
- *  <p/>
- *  gplus.oauth.pathToP12KeyFile
- *  gplus.oauth.serviceAccountEmailAddress
- *  gplus.apiKey
- *  gplus.googlePlusUsers
- *
- *  <p/>
- *  Launch using:
- *
- *  <p/>
- *  mvn exec:java -Dexec.mainClass=com.google.gplus.provider.GPlusUserDataProvider -Dexec.args="application.conf profiles.json"
- */
-public class GPlusUserDataProvider extends AbstractGPlusProvider {
-
-  public static final String STREAMS_ID = "GPlusUserDataProvider";
-
-  public GPlusUserDataProvider() {
-    super();
-  }
-
-  public GPlusUserDataProvider(GPlusConfiguration config) {
-    super(config);
-  }
-
-  @Override
-  public String getId() {
-    return STREAMS_ID;
-  }
-
-  @Override
-  protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) {
-    return new GPlusUserDataCollector(plus, strategy, queue, userInfo);
-  }
-
-  /**
-   * Retrieve current profile status for a list of accounts.
-   * @param args args
-   * @throws Exception Exception
-   */
-  public static void main(String[] args) throws Exception {
-
-    Preconditions.checkArgument(args.length >= 2);
-
-    String configfile = args[0];
-    String outfile = args[1];
-
-    Config reference = ConfigFactory.load();
-    File file = new File(configfile);
-    assert (file.exists());
-    Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false));
-
-    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-
-    StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
-    GPlusConfiguration config = new ComponentConfigurator<>(GPlusConfiguration.class).detectConfiguration(typesafe, "gplus");
-    GPlusUserDataProvider provider = new GPlusUserDataProvider(config);
-
-    Gson gson = new Gson();
-
-    PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
-    provider.prepare(config);
-    provider.startStream();
-    do {
-      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
-      Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
-      while (iterator.hasNext()) {
-        StreamsDatum datum = iterator.next();
-        String json;
-        if (datum.getDocument() instanceof String) {
-          json = (String) datum.getDocument();
-        } else {
-          json = gson.toJson(datum.getDocument());
-        }
-        outStream.println(json);
-      }
-    }
-    while ( provider.isRunning());
-    provider.cleanUp();
-    outStream.flush();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusActivityDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusActivityDeserializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusActivityDeserializer.java
deleted file mode 100644
index f13267a..0000000
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusActivityDeserializer.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.serializer.util;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.google.api.client.util.DateTime;
-import com.google.api.services.plus.model.Activity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Custom deserializer for GooglePlus' Person model.
- */
-public class GPlusActivityDeserializer extends JsonDeserializer<Activity> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(GPlusActivityDeserializer.class);
-
-  /**
-   * Because the GooglePlus Activity object {@link com.google.api.services.plus.model.Activity} contains complex objects
-   * within its hierarchy, we have to use a custom deserializer
-   *
-   * @param jsonParser jsonParser
-   * @param deserializationContext deserializationContext
-   * @return The deserialized {@link com.google.api.services.plus.model.Activity} object
-   * @throws IOException IOException
-   * @throws JsonProcessingException JsonProcessingException
-   */
-  @Override
-  public Activity deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
-
-    JsonNode node = jsonParser.getCodec().readTree(jsonParser);
-    Activity activity = new Activity();
-
-    try {
-      activity.setUrl(node.get("url").asText());
-      activity.setEtag(node.get("etag").asText());
-      activity.setTitle(node.get("title").asText());
-      activity.setPublished(DateTime.parseRfc3339(node.get("published").asText()));
-      activity.setUpdated(DateTime.parseRfc3339(node.get("updated").asText()));
-      activity.setId(node.get("id").asText());
-      activity.setVerb(node.get("verb").asText());
-
-      activity.setActor(buildActor(node));
-
-      activity.setObject(buildPlusObject(node));
-    } catch (Exception ex) {
-      LOGGER.error("Exception while trying to deserialize activity object: {}", ex);
-    }
-
-    return activity;
-  }
-
-  /**
-   * Given a raw JsonNode, build out the G+ {@link com.google.api.services.plus.model.Activity.Actor} object
-   *
-   * @param node node
-   * @return {@link com.google.api.services.plus.model.Activity.Actor} object
-   */
-  private Activity.Actor buildActor(JsonNode node) {
-    Activity.Actor actor = new Activity.Actor();
-    JsonNode actorNode = node.get("actor");
-
-    actor.setId(actorNode.get("id").asText());
-    actor.setDisplayName(actorNode.get("displayName").asText());
-    actor.setUrl(actorNode.get("url").asText());
-
-    Activity.Actor.Image image = new Activity.Actor.Image();
-    JsonNode imageNode = actorNode.get("image");
-    image.setUrl(imageNode.get("url").asText());
-
-    actor.setImage(image);
-
-    return actor;
-  }
-
-  /**
-   * Given a JsonNode, build out all aspects of the {@link com.google.api.services.plus.model.Activity.PlusObject} object
-   *
-   * @param node node
-   * @return {@link com.google.api.services.plus.model.Activity.PlusObject} object
-   */
-  private Activity.PlusObject buildPlusObject(JsonNode node) {
-    Activity.PlusObject object = new Activity.PlusObject();
-    JsonNode objectNode = node.get("object");
-    object.setObjectType(objectNode.get("objectType").asText());
-    object.setContent(objectNode.get("content").asText());
-    object.setUrl(objectNode.get("url").asText());
-
-    Activity.PlusObject.Replies replies = new Activity.PlusObject.Replies();
-    JsonNode repliesNode = objectNode.get("replies");
-    replies.setTotalItems(repliesNode.get("totalItems").asLong());
-    replies.setSelfLink(repliesNode.get("selfLink").asText());
-    object.setReplies(replies);
-
-    Activity.PlusObject.Plusoners plusoners = new Activity.PlusObject.Plusoners();
-    JsonNode plusonersNode = objectNode.get("plusoners");
-    plusoners.setTotalItems(plusonersNode.get("totalItems").asLong());
-    plusoners.setSelfLink(plusonersNode.get("selfLink").asText());
-    object.setPlusoners(plusoners);
-
-    Activity.PlusObject.Resharers resharers = new Activity.PlusObject.Resharers();
-    JsonNode resharersNode = objectNode.get("resharers");
-    resharers.setTotalItems(resharersNode.get("totalItems").asLong());
-    resharers.setSelfLink(resharersNode.get("selfLink").asText());
-    object.setResharers(resharers);
-
-    object.setAttachments(buildAttachments(objectNode));//attachments);
-
-    return object;
-  }
-
-  /**
-   * Given a raw JsonNode representation of an Activity's attachments, build out that
-   * list of {@link com.google.api.services.plus.model.Activity.PlusObject.Attachments} objects
-   *
-   * @param objectNode objectNode
-   * @return list of {@link com.google.api.services.plus.model.Activity.PlusObject.Attachments} objects
-   */
-  private List<Activity.PlusObject.Attachments> buildAttachments(JsonNode objectNode) {
-    List<Activity.PlusObject.Attachments> attachments = new ArrayList<>();
-    if ( objectNode.has("attachments") ) {
-      for (JsonNode attachmentNode : objectNode.get("attachments")) {
-        Activity.PlusObject.Attachments attachments1 = new Activity.PlusObject.Attachments();
-        attachments1.setObjectType(attachmentNode.get("objectType").asText());
-        if (attachmentNode.has("displayName")) {
-          attachments1.setDisplayName(attachmentNode.get("displayName").asText());
-        }
-        if (attachmentNode.has("content")) {
-          attachments1.setContent(attachmentNode.get("content").asText());
-        }
-        if (attachmentNode.has("url")) {
-          attachments1.setUrl(attachmentNode.get("url").asText());
-        }
-
-        if( attachmentNode.has("image")) {
-          Activity.PlusObject.Attachments.Image image1 = new Activity.PlusObject.Attachments.Image();
-          JsonNode imageNode1 = attachmentNode.get("image");
-          image1.setUrl(imageNode1.get("url").asText());
-          attachments1.setImage(image1);
-        }
-
-        attachments.add(attachments1);
-      }
-    }
-    return attachments;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusCommentDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusCommentDeserializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusCommentDeserializer.java
deleted file mode 100644
index 5b92c43..0000000
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusCommentDeserializer.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.serializer.util;
-
-import org.apache.streams.jackson.StreamsJacksonMapper;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.api.client.util.DateTime;
-import com.google.api.services.plus.model.Comment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * GPlusCommentDeserializer converts gplus comments to as1 comments.
- */
-public class GPlusCommentDeserializer  extends JsonDeserializer<Comment> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(GPlusActivityDeserializer.class);
-
-  /**
-   * Because the GooglePlus Comment object {@link com.google.api.services.plus.model.Comment} contains complex objects
-   * within its hierarchy, we have to use a custom deserializer
-   *
-   * @param jsonParser jsonParser
-   * @param deserializationContext deserializationContext
-   * @return The deserialized {@link com.google.api.services.plus.model.Comment} object
-   * @throws java.io.IOException IOException
-   * @throws com.fasterxml.jackson.core.JsonProcessingException JsonProcessingException
-   */
-  @Override
-  public Comment deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
-      throws IOException, JsonProcessingException {
-
-    JsonNode node = jsonParser.getCodec().readTree(jsonParser);
-    ObjectMapper objectMapper = StreamsJacksonMapper.getInstance();
-    Comment comment = new Comment();
-
-    try {
-      comment.setEtag(node.get("etag").asText());
-      comment.setVerb(node.get("verb").asText());
-      comment.setId(node.get("id").asText());
-      comment.setPublished(DateTime.parseRfc3339(node.get("published").asText()));
-      comment.setUpdated(DateTime.parseRfc3339(node.get("updated").asText()));
-
-      Comment.Actor actor = new Comment.Actor();
-      JsonNode actorNode = node.get("actor");
-      actor.setDisplayName(actorNode.get("displayName").asText());
-      actor.setUrl(actorNode.get("url").asText());
-
-      Comment.Actor.Image image = new Comment.Actor.Image();
-      JsonNode imageNode = actorNode.get("image");
-      image.setUrl(imageNode.get("url").asText());
-
-      actor.setImage(image);
-
-      comment.setObject(objectMapper.readValue(objectMapper.writeValueAsString(node.get("object")), Comment.PlusObject.class));
-
-      comment.setSelfLink(node.get("selfLink").asText());
-
-      List<Comment.InReplyTo> replies = new ArrayList<>();
-      for (JsonNode reply : node.get("inReplyTo")) {
-        Comment.InReplyTo irt = objectMapper.readValue(objectMapper.writeValueAsString(reply), Comment.InReplyTo.class);
-        replies.add(irt);
-      }
-
-      comment.setInReplyTo(replies);
-
-      Comment.Plusoners plusoners = new Comment.Plusoners();
-      JsonNode plusonersNode = node.get("plusoners");
-      plusoners.setTotalItems(plusonersNode.get("totalItems").asLong());
-      comment.setPlusoners(plusoners);
-    } catch (Exception ex) {
-      LOGGER.error("Exception while trying to deserialize activity object: {}", ex);
-    }
-
-    return comment;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusEventClassifier.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusEventClassifier.java
deleted file mode 100644
index 4923d3e..0000000
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusEventClassifier.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.serializer.util;
-
-import org.apache.streams.jackson.StreamsJacksonMapper;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.api.services.plus.model.Activity;
-import com.google.api.services.plus.model.Person;
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang.StringUtils;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Objects;
-
-/**
- * GPlusEventClassifier classifies GPlus Events.
- */
-public class GPlusEventClassifier implements Serializable {
-
-  private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-  private static final String ACTIVITY_IDENTIFIER = "\"plus#activity\"";
-  private static final String PERSON_IDENTIFIER = "\"plus#person\"";
-
-  /**
-   * Detect likely class of String json.
-   * @param json String json
-   * @return likely class
-   */
-  public static Class detectClass(String json) {
-    Objects.requireNonNull(json);
-    Preconditions.checkArgument(StringUtils.isNotEmpty(json));
-
-    ObjectNode objectNode;
-    try {
-      objectNode = (ObjectNode) mapper.readTree(json);
-    } catch (IOException ex) {
-      ex.printStackTrace();
-      return null;
-    }
-
-    if (objectNode.findValue("kind") != null && objectNode.get("kind").toString().equals(ACTIVITY_IDENTIFIER)) {
-      return Activity.class;
-    } else if (objectNode.findValue("kind") != null && objectNode.get("kind").toString().equals(PERSON_IDENTIFIER)) {
-      return Person.class;
-    } else  {
-      return ObjectNode.class;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusPersonDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusPersonDeserializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusPersonDeserializer.java
deleted file mode 100644
index 38bb168..0000000
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusPersonDeserializer.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.serializer.util;
-
-import org.apache.streams.jackson.StreamsJacksonMapper;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.api.services.plus.model.Person;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Custom deserializer for GooglePlus' Person model.
- */
-public class GPlusPersonDeserializer extends JsonDeserializer<Person> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(GPlusPersonDeserializer.class);
-
-  /**
-   * Because the GooglePlus Person object contains complex objects within its hierarchy, we have to use
-   * a custom deserializer
-   *
-   * @param jsonParser jsonParser
-   * @param deserializationContext deserializationContext
-   * @return The deserialized {@link com.google.api.services.plus.model.Person} object
-   * @throws IOException IOException
-   * @throws JsonProcessingException JsonProcessingException
-   */
-  @Override
-  public Person deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
-    ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    JsonNode node = jsonParser.getCodec().readTree(jsonParser);
-    Person person = new Person();
-    try {
-      person.setId(node.get("id").asText());
-      person.setCircledByCount((Integer) (node.get("circledByCount")).numberValue());
-      person.setDisplayName(node.get("displayName").asText());
-      if( node.has("etag")) {
-        person.setEtag(node.get("etag").asText());
-      }
-      if( node.has("gender")) {
-        person.setGender(node.get("gender").asText());
-      }
-
-      Person.Image image = new Person.Image();
-      if( node.has("image") ) {
-        JsonNode imageNode = node.get("image");
-        image.setIsDefault(imageNode.get("isDefault").asBoolean());
-        image.setUrl(imageNode.get("url").asText());
-        person.setImage(image);
-      }
-
-      person.setIsPlusUser(node.get("isPlusUser").asBoolean());
-      person.setKind(node.get("kind").asText());
-
-      JsonNode nameNode = node.get("name");
-      Person.Name name = mapper.readValue(mapper.writeValueAsString(nameNode), Person.Name.class);
-      person.setName(name);
-
-      person.setObjectType(node.get("objectType").asText());
-
-      List<Person.Organizations> organizations = new ArrayList<>();
-      if( node.has("organizations")) {
-        for (JsonNode orgNode : node.get("organizations")) {
-          Person.Organizations org = mapper.readValue(mapper.writeValueAsString(orgNode), Person.Organizations.class);
-          organizations.add(org);
-        }
-        person.setOrganizations(organizations);
-      }
-
-      person.setUrl(node.get("url").asText());
-      person.setVerified(node.get("verified").asBoolean());
-
-      List<Person.Emails> emails = new ArrayList<>();
-
-      if ( node.has("emails")) {
-        for (JsonNode emailNode : node.get("emails")) {
-          Person.Emails email = mapper.readValue(mapper.writeValueAsString(emailNode), Person.Emails.class);
-          emails.add(email);
-        }
-      }
-
-      if ( node.has("tagline")) {
-        person.setTagline(node.get("tagline").asText());
-      }
-      if ( node.has("aboutMe")) {
-        person.setAboutMe(node.get("aboutMe").asText());
-      }
-    } catch (Exception ex) {
-      LOGGER.error("Exception while trying to deserialize a Person object: {}", ex);
-    }
-
-    return person;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GooglePlusActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GooglePlusActivityUtil.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GooglePlusActivityUtil.java
deleted file mode 100644
index 27b25a2..0000000
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GooglePlusActivityUtil.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.serializer.util;
-
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.pojo.extensions.ExtensionUtil;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.apache.streams.pojo.json.Image;
-import org.apache.streams.pojo.json.Provider;
-
-import com.google.api.services.plus.model.Comment;
-import com.google.api.services.plus.model.Person;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * GooglePlusActivityUtil helps convert c.g.Person and c.g.Activity into o.a.s.p.j.o.Page and o.a.s.p.j.Activity.
- */
-public class GooglePlusActivityUtil {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(GooglePlusActivityUtil.class);
-
-  /**
-   * Given a {@link Person} object and an
-   * {@link Activity} object, fill out the appropriate details.
-   *
-   * @param item Person
-   * @param activity Activity
-   * @throws ActivitySerializerException ActivitySerializerException
-   */
-  public static void updateActivity(Person item, Activity activity) throws ActivitySerializerException {
-    activity.setActor(buildActor(item));
-    activity.setVerb("update");
-
-    activity.setId(formatId(activity.getVerb(), Optional.ofNullable(item.getId()).orElse(null)));
-
-    activity.setProvider(getProvider());
-  }
-
-  /**
-   * Given a {@link List} of {@link Comment} objects and an
-   * {@link Activity}, update that Activity to contain all comments
-   *
-   * @param comments input List of Comment
-   * @param activity output Activity
-   */
-  public static void updateActivity(List<Comment> comments, Activity activity) {
-    for (Comment comment : comments) {
-      addComment(activity, comment);
-    }
-
-    Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
-    extensions.put("comment_count", comments.size());
-  }
-
-  /**
-   * Given a Google Plus {@link com.google.api.services.plus.model.Activity},
-   * convert that into an Activity streams formatted {@link Activity}
-   *
-   * @param gPlusActivity input c.g.a.s.p.m.Activity
-   * @param activity output o.a.s.p.j.Activity
-   */
-  public static void updateActivity(com.google.api.services.plus.model.Activity gPlusActivity, Activity activity) {
-    activity.setActor(buildActor(gPlusActivity.getActor()));
-    activity.setVerb("post");
-    activity.setTitle(gPlusActivity.getTitle());
-    activity.setUrl(gPlusActivity.getUrl());
-    activity.setProvider(getProvider());
-
-    if (gPlusActivity.getObject() != null) {
-      activity.setContent(gPlusActivity.getObject().getContent());
-    }
-
-    activity.setId(formatId(activity.getVerb(), Optional.ofNullable(gPlusActivity.getId()).orElse(null)));
-
-    DateTime published = new DateTime(String.valueOf(gPlusActivity.getPublished()));
-    activity.setPublished(published);
-
-    setObject(activity, gPlusActivity.getObject());
-    addGPlusExtensions(activity, gPlusActivity);
-  }
-
-  /**
-   * Adds a single {@link Comment} to the Object.Attachments
-   * section of the passed in {@link Activity}
-   *
-   * @param activity output o.a.s.p.j.Activity
-   * @param comment input c.g.a.s.p.m.Comment
-   */
-  private static void addComment(Activity activity, Comment comment) {
-    ActivityObject obj = new ActivityObject();
-
-    obj.setId(comment.getId());
-    obj.setPublished(new DateTime(String.valueOf(comment.getPublished())));
-    obj.setUpdated(new DateTime(String.valueOf(comment.getUpdated())));
-    obj.setContent(comment.getObject().getContent());
-    obj.setObjectType(comment.getObject().getObjectType());
-
-    Map<String, Object> extensions = new HashMap<>();
-    extensions.put("googlePlus", comment);
-
-    obj.setAdditionalProperty("extensions", extensions);
-
-    if (activity.getObject() == null) {
-      activity.setObject(new ActivityObject());
-    }
-    if (activity.getObject().getAttachments() == null) {
-      activity.getObject().setAttachments(new ArrayList<>());
-    }
-
-    activity.getObject().getAttachments().add(obj);
-  }
-
-  /**
-   * Add in necessary extensions from the passed in {@link com.google.api.services.plus.model.Activity} to the
-   * {@link Activity} object
-   *
-   * @param activity output o.a.s.p.j.Activity
-   * @param gPlusActivity input c.g.a.s.p.m.Activity
-   */
-  private static void addGPlusExtensions(Activity activity, com.google.api.services.plus.model.Activity gPlusActivity) {
-
-    activity.getAdditionalProperties().put("googlePlus", gPlusActivity);
-
-    Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
-
-    com.google.api.services.plus.model.Activity.PlusObject object = gPlusActivity.getObject();
-
-    if (object != null) {
-      com.google.api.services.plus.model.Activity.PlusObject.Plusoners plusoners = object.getPlusoners();
-      if (plusoners != null) {
-        Map<String, Object> likes = new HashMap<>();
-        likes.put("count", plusoners.getTotalItems());
-        extensions.put("likes", likes);
-      }
-
-      com.google.api.services.plus.model.Activity.PlusObject.Resharers resharers = object.getResharers();
-      if (resharers != null) {
-        Map<String, Object> rebroadcasts = new HashMap<>();
-        rebroadcasts.put("count", resharers.getTotalItems());
-        extensions.put("rebroadcasts", rebroadcasts);
-      }
-
-      extensions.put("keywords", object.getContent());
-    }
-  }
-
-  /**
-   * Set the {@link ActivityObject} field given the passed in
-   * {@link com.google.api.services.plus.model.Activity.PlusObject}
-   *
-   * @param activity output $.object as o.a.s.p.j.ActivityObject
-   * @param plusObject input c.g.a.s.p.m.Activity.PlusObject
-   */
-  private static void setObject(Activity activity, com.google.api.services.plus.model.Activity.PlusObject plusObject) {
-    if (plusObject != null) {
-      ActivityObject activityObject = new ActivityObject();
-
-      activityObject.setContent(plusObject.getContent());
-      activityObject.setObjectType(plusObject.getObjectType());
-
-      List<ActivityObject> attachmentsList = new ArrayList<>();
-      for (com.google.api.services.plus.model.Activity.PlusObject.Attachments attachments : plusObject.getAttachments()) {
-        ActivityObject attach = new ActivityObject();
-
-        attach.setContent(attachments.getContent());
-        attach.setDisplayName(attachments.getDisplayName());
-        attach.setObjectType(attachments.getObjectType());
-        attach.setUrl(attachments.getUrl());
-
-        Image image = new Image();
-        com.google.api.services.plus.model.Activity.PlusObject.Attachments.Image image1 = attachments.getImage();
-
-        if (image1 != null) {
-          image.setUrl(image1.getUrl());
-          attach.setImage(image);
-        }
-
-        attachmentsList.add(attach);
-      }
-
-      activityObject.setAttachments(attachmentsList);
-
-      activity.setObject(activityObject);
-    }
-  }
-
-  /**
-   * Given a {@link com.google.api.services.plus.model.Activity.Actor} object, return a fully fleshed
-   * out {@link ActivityObject} actor
-   *
-   * @param gPlusActor input c.g.a.s.p.m.Activity.Actor
-   * @return {@link ActivityObject} output $.actor as o.a.s.p.j.ActivityObject
-   */
-  private static ActivityObject buildActor(com.google.api.services.plus.model.Activity.Actor gPlusActor) {
-    ActivityObject actor = new ActivityObject();
-
-    actor.setDisplayName(gPlusActor.getDisplayName());
-    actor.setId(formatId(String.valueOf(gPlusActor.getId())));
-    actor.setUrl(gPlusActor.getUrl());
-
-    Image image = new Image();
-    com.google.api.services.plus.model.Activity.Actor.Image googlePlusImage = gPlusActor.getImage();
-
-    if (googlePlusImage != null) {
-      image.setUrl(googlePlusImage.getUrl());
-    }
-    actor.setImage(image);
-
-    return actor;
-  }
-
-  /**
-   * Extract the relevant details from the passed in {@link Person} object and build
-   * an actor with them
-   *
-   * @param person Person
-   * @return Actor constructed with relevant Person details
-   */
-  private static ActivityObject buildActor(Person person) {
-    ActivityObject actor = new ActivityObject();
-
-    actor.setUrl(person.getUrl());
-    actor.setDisplayName(person.getDisplayName());
-    actor.setId(formatId(String.valueOf(person.getId())));
-
-    if (person.getAboutMe() != null) {
-      actor.setSummary(person.getAboutMe());
-    } else if (person.getTagline() != null) {
-      actor.setSummary(person.getTagline());
-    }
-
-    Image image = new Image();
-    Person.Image googlePlusImage = person.getImage();
-
-    if (googlePlusImage != null) {
-      image.setUrl(googlePlusImage.getUrl());
-    }
-    actor.setImage(image);
-
-    Map<String, Object> extensions = new HashMap<>();
-
-    extensions.put("followers", person.getCircledByCount());
-    extensions.put("googleplus", person);
-    actor.setAdditionalProperty("extensions", extensions);
-
-    return actor;
-  }
-
-  /**
-   * Gets the common googleplus {@link Provider} object
-   * @return a provider object representing GooglePlus
-   */
-  public static Provider getProvider() {
-    Provider provider = new Provider();
-    provider.setId("id:providers:googleplus");
-    provider.setDisplayName("GooglePlus");
-    return provider;
-  }
-
-  /**
-   * Formats the ID to conform with the Apache Streams activity ID convention
-   * @param idparts the parts of the ID to join
-   * @return a valid Activity ID in format "id:googleplus:part1:part2:...partN"
-   */
-  public static String formatId(String... idparts) {
-    return String.join(":",
-        Stream.concat(Arrays.stream(new String[]{"id:googleplus"}), Arrays.stream(idparts)).collect(Collectors.toList()));
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/processor/GooglePlusCommentProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/processor/GooglePlusCommentProcessor.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/processor/GooglePlusCommentProcessor.java
new file mode 100644
index 0000000..1128a93
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/processor/GooglePlusCommentProcessor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.processor;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.gplus.serializer.util.GooglePlusActivityUtil;
+import org.apache.streams.pojo.json.Activity;
+
+import com.google.api.services.plus.model.Comment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * GooglePlusCommentProcessor collects comments about a google plus activity.
+ */
+public class GooglePlusCommentProcessor implements StreamsProcessor {
+
+  private static final String STREAMS_ID = "GooglePlusCommentProcessor";
+  private static final Logger LOGGER = LoggerFactory.getLogger(GooglePlusCommentProcessor.class);
+  private int count;
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
+    StreamsDatum result = null;
+
+    try {
+      Object item = entry.getDocument();
+      LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
+
+      //Get G+ activity ID from our own activity ID
+      if (item instanceof Activity) {
+        Activity activity = (Activity) item;
+        String activityId = getGPlusID(activity.getId());
+
+        //Call Google Plus API to get list of comments for this activity ID
+        /* TODO: FILL ME OUT WITH THE API CALL **/
+        List<Comment> comments = new ArrayList<>();
+
+        GooglePlusActivityUtil.updateActivity(comments, activity);
+        result = new StreamsDatum(activity);
+      }
+    } catch (Exception ex) {
+      ex.printStackTrace();
+      LOGGER.error("Exception while converting Comment to Activity: {}", ex.getMessage());
+    }
+
+    if ( result != null ) {
+      return Stream.of(result).collect(Collectors.toList());
+    } else {
+      return new ArrayList<>();
+    }
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    count = 0;
+  }
+
+  @Override
+  public void cleanUp() {
+
+  }
+
+  private String getGPlusID(String activityId) {
+    String[] activityParts = activityId.split(":");
+    return (activityParts.length > 0) ? activityParts[activityParts.length - 1] : "";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/processor/GooglePlusTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/processor/GooglePlusTypeConverter.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/processor/GooglePlusTypeConverter.java
new file mode 100644
index 0000000..ff11b45
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/processor/GooglePlusTypeConverter.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.processor;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.gplus.serializer.util.GPlusActivityDeserializer;
+import org.apache.streams.gplus.serializer.util.GPlusEventClassifier;
+import org.apache.streams.gplus.serializer.util.GPlusPersonDeserializer;
+import org.apache.streams.gplus.serializer.util.GooglePlusActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.api.services.plus.model.Person;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * GooglePlusTypeConverter is a StreamsProcessor that converts gplus activities to activitystreams activities.
+ */
+public class GooglePlusTypeConverter implements StreamsProcessor {
+
+  public static final String STREAMS_ID = "GooglePlusTypeConverter";
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(GooglePlusTypeConverter.class);
+  private StreamsJacksonMapper mapper;
+  private Queue<Person> inQueue;
+  private Queue<StreamsDatum> outQueue;
+  private int count = 0;
+
+  public GooglePlusTypeConverter() {}
+
+  public Queue<StreamsDatum> getProcessorOutputQueue() {
+    return outQueue;
+  }
+
+  public void setProcessorInputQueue(Queue<Person> inputQueue) {
+    inQueue = inputQueue;
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
+    StreamsDatum result = null;
+
+    try {
+      Object item = entry.getDocument();
+
+      LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
+      Activity activity = null;
+
+      if (item instanceof String) {
+        item = deserializeItem(item);
+      }
+
+      if (item instanceof Person) {
+        activity = new Activity();
+        GooglePlusActivityUtil.updateActivity((Person) item, activity);
+      } else if (item instanceof com.google.api.services.plus.model.Activity) {
+        activity = new Activity();
+        GooglePlusActivityUtil.updateActivity((com.google.api.services.plus.model.Activity) item, activity);
+      }
+
+      if (activity != null) {
+        result = new StreamsDatum(activity);
+        count++;
+      }
+    } catch (Exception ex) {
+      ex.printStackTrace();
+      LOGGER.error("Exception while converting Person to Activity: {}", ex.getMessage());
+    }
+
+    if (result != null) {
+      return Stream.of(result).collect(Collectors.toList());
+    } else {
+      return new ArrayList<>();
+    }
+  }
+
+  private Object deserializeItem(Object item) {
+    try {
+      Class klass = GPlusEventClassifier.detectClass((String) item);
+
+      if (klass.equals(Person.class)) {
+        item = mapper.readValue((String) item, Person.class);
+      } else if (klass.equals(com.google.api.services.plus.model.Activity.class)) {
+        item = mapper.readValue((String) item, com.google.api.services.plus.model.Activity.class);
+      }
+    } catch (Exception ex) {
+      LOGGER.error("Exception while trying to deserializeItem: {}", ex);
+    }
+
+    return item;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    mapper = StreamsJacksonMapper.getInstance();
+
+    SimpleModule simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(Person.class, new GPlusPersonDeserializer());
+    mapper.registerModule(simpleModule);
+
+    simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(com.google.api.services.plus.model.Activity.class, new GPlusActivityDeserializer());
+    mapper.registerModule(simpleModule);
+  }
+
+  @Override
+  public void cleanUp() {
+    //No-op
+  }
+}


Mime
View raw message