Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 81DFB200C23 for ; Wed, 8 Feb 2017 05:33:12 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 805C0160B6C; Wed, 8 Feb 2017 04:33:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 45A5E160B3E for ; Wed, 8 Feb 2017 05:33:10 +0100 (CET) Received: (qmail 31762 invoked by uid 500); 8 Feb 2017 04:33:09 -0000 Mailing-List: contact commits-help@streams.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@streams.incubator.apache.org Delivered-To: mailing list commits@streams.incubator.apache.org Received: (qmail 31744 invoked by uid 99); 8 Feb 2017 04:33:09 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Feb 2017 04:33:09 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id CCB0CC0258 for ; Wed, 8 Feb 2017 04:33:08 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id Brvj0ZIxrHBw for ; Wed, 8 Feb 2017 04:32:56 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 41B585F473 for ; Wed, 8 Feb 2017 04:32:48 +0000 (UTC) Received: (qmail 31240 invoked by uid 99); 8 Feb 2017 04:32:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Feb 2017 04:32:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 36080E04B2; Wed, 8 Feb 2017 04:32:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: smarthi@apache.org To: commits@streams.incubator.apache.org Date: Wed, 08 Feb 2017 04:32:54 -0000 Message-Id: <2e0fb2746a8542f6a2a82f27bfb345df@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [8/9] incubator-streams git commit: STREAMS-463: Move every class in all repos underneath org.apache.streams, this closes apache/incubator-streams#356 archived-at: Wed, 08 Feb 2017 04:33:12 -0000 http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java deleted file mode 100644 index d9c7dd7..0000000 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gplus.provider; - -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.google.gplus.GPlusConfiguration; -import org.apache.streams.google.gplus.configuration.UserInfo; -import org.apache.streams.util.ComponentUtils; -import org.apache.streams.util.api.requests.backoff.BackOffStrategy; -import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy; - -import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.http.javanet.NetHttpTransport; -import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.api.services.plus.Plus; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.gson.Gson; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.math.BigInteger; -import java.security.GeneralSecurityException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Provider that creates a GPlus client and will run task that queue data to an outing queue. - */ -public abstract class AbstractGPlusProvider implements StreamsProvider { - - public static final String STREAMS_ID = "AbstractGPlusProvider"; - - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractGPlusProvider.class); - private static final Set SCOPE = new HashSet() { - { - add("https://www.googleapis.com/auth/plus.login"); - } - }; - private static final int MAX_BATCH_SIZE = 1000; - - private static final HttpTransport TRANSPORT = new NetHttpTransport(); - private static final JacksonFactory JSON_FACTORY = new JacksonFactory(); - private static final Gson GSON = new Gson(); - - private GPlusConfiguration config; - - List> futures = new ArrayList<>(); - - private ListeningExecutorService executor; - - private BlockingQueue datumQueue; - private AtomicBoolean isComplete; - private boolean previousPullWasEmpty; - - protected GoogleCredential credential; - protected Plus plus; - - public AbstractGPlusProvider() { - this.config = new ComponentConfigurator<>(GPlusConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("gplus")); - } - - public AbstractGPlusProvider(GPlusConfiguration config) { - this.config = config; - } - - @Override - public void prepare(Object configurationObject) { - - Objects.requireNonNull(config.getOauth().getPathToP12KeyFile()); - Objects.requireNonNull(config.getOauth().getAppName()); - Objects.requireNonNull(config.getOauth().getServiceAccountEmailAddress()); - - try { - this.plus = createPlusClient(); - } catch (IOException | GeneralSecurityException ex) { - LOGGER.error("Failed to created oauth for GPlus : {}", ex); - throw new RuntimeException(ex); - } - // GPlus rate limits you to 5 calls per second, so there is not a need to execute more than one - // collector unless you have multiple oauth tokens - //TODO make this configurable based on the number of oauth tokens - this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); - this.datumQueue = new LinkedBlockingQueue<>(1000); - this.isComplete = new AtomicBoolean(false); - this.previousPullWasEmpty = false; - } - - @Override - public void startStream() { - - BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2); - for (UserInfo user : this.config.getGooglePlusUsers()) { - if (this.config.getDefaultAfterDate() != null && user.getAfterDate() == null) { - user.setAfterDate(this.config.getDefaultAfterDate()); - } - if (this.config.getDefaultBeforeDate() != null && user.getBeforeDate() == null) { - user.setBeforeDate(this.config.getDefaultBeforeDate()); - } - this.executor.submit(getDataCollector(backOffStrategy, this.datumQueue, this.plus, user)); - } - this.executor.shutdown(); - } - - protected abstract Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue queue, Plus plus, UserInfo userInfo); - - @Override - public String getId() { - return STREAMS_ID; - } - - @Override - public StreamsResultSet readCurrent() { - BlockingQueue batch = new LinkedBlockingQueue<>(); - int batchCount = 0; - while (!this.datumQueue.isEmpty() && batchCount < MAX_BATCH_SIZE) { - StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.datumQueue); - if (datum != null) { - ++batchCount; - ComponentUtils.offerUntilSuccess(datum, batch); - } - } - boolean pullIsEmpty = batch.isEmpty() && this.datumQueue.isEmpty() && this.executor.isTerminated(); - this.isComplete.set(this.previousPullWasEmpty && pullIsEmpty); - this.previousPullWasEmpty = pullIsEmpty; - return new StreamsResultSet(batch); - } - - @Override - public StreamsResultSet readNew(BigInteger sequence) { - return null; - } - - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return null; - } - - @VisibleForTesting - protected Plus createPlusClient() throws IOException, GeneralSecurityException { - credential = new GoogleCredential.Builder() - .setJsonFactory(JSON_FACTORY) - .setTransport(TRANSPORT) - .setServiceAccountScopes(SCOPE) - .setServiceAccountId(this.config.getOauth().getServiceAccountEmailAddress()) - .setServiceAccountPrivateKeyFromP12File(new File(this.config.getOauth().getPathToP12KeyFile())) - .build(); - return new Plus.Builder(TRANSPORT,JSON_FACTORY, credential).setApplicationName(this.config.getOauth().getAppName()).build(); - } - - @Override - public void cleanUp() { - ComponentUtils.shutdownExecutor(this.executor, 10, 10); - this.executor = null; - } - - public GPlusConfiguration getConfig() { - return config; - } - - public void setConfig(GPlusConfiguration config) { - this.config = config; - } - - /** - * Set and overwrite the default before date that was read from the configuration file. - * @param defaultBeforeDate defaultBeforeDate - */ - public void setDefaultBeforeDate(DateTime defaultBeforeDate) { - this.config.setDefaultBeforeDate(defaultBeforeDate); - } - - /** - * Set and overwrite the default after date that was read from teh configuration file. - * @param defaultAfterDate defaultAfterDate - */ - public void setDefaultAfterDate(DateTime defaultAfterDate) { - this.config.setDefaultAfterDate(defaultAfterDate); - } - - /** - * Sets and overwrite the user info from the configuaration file. Uses the defaults before and after dates. - * @param userIds userIds - */ - public void setUserInfoWithDefaultDates(Set userIds) { - List gplusUsers = new LinkedList<>(); - for (String userId : userIds) { - UserInfo user = new UserInfo(); - user.setUserId(userId); - user.setAfterDate(this.config.getDefaultAfterDate()); - user.setBeforeDate(this.config.getDefaultBeforeDate()); - gplusUsers.add(user); - } - this.config.setGooglePlusUsers(gplusUsers); - } - - /** - * Set and overwrite user into from the configuration file. Only sets after date. - * @param usersAndAfterDates usersAndAfterDates - */ - public void setUserInfoWithAfterDate(Map usersAndAfterDates) { - List gplusUsers = new LinkedList<>(); - for (String userId : usersAndAfterDates.keySet()) { - UserInfo user = new UserInfo(); - user.setUserId(userId); - user.setAfterDate(usersAndAfterDates.get(userId)); - gplusUsers.add(user); - } - this.config.setGooglePlusUsers(gplusUsers); - } - - @Override - public boolean isRunning() { - if (datumQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { - LOGGER.info("Completed"); - isComplete.set(true); - LOGGER.info("Exiting"); - } - return !isComplete.get(); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java deleted file mode 100644 index eda55d2..0000000 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gplus.provider; - -import org.apache.streams.data.ActivitySerializer; -import org.apache.streams.pojo.json.Activity; - -import com.google.gplus.serializer.util.GooglePlusActivityUtil; -import org.apache.commons.lang.NotImplementedException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -/** - * GPlusActivitySerializer converts gplus activities to as1 activities. - */ -public class GPlusActivitySerializer implements ActivitySerializer { - - private static final Logger LOGGER = LoggerFactory.getLogger(GPlusActivitySerializer.class); - - AbstractGPlusProvider provider; - - public GPlusActivitySerializer(AbstractGPlusProvider provider) { - - this.provider = provider; - } - - public GPlusActivitySerializer() { - } - - @Override - public String serializationFormat() { - return "gplus.v1"; - } - - @Override - public com.google.api.services.plus.model.Activity serialize(Activity deserialized) { - throw new NotImplementedException("Not currently implemented"); - } - - @Override - public Activity deserialize(com.google.api.services.plus.model.Activity gplusActivity) { - Activity activity = new Activity(); - - GooglePlusActivityUtil.updateActivity(gplusActivity, activity); - return activity; - } - - @Override - public List deserializeAll(List serializedList) { - throw new NotImplementedException("Not currently implemented"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java deleted file mode 100644 index 351e5be..0000000 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gplus.provider; - -import org.apache.streams.util.api.requests.backoff.BackOffException; -import org.apache.streams.util.api.requests.backoff.BackOffStrategy; - -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * GPlusDataCollector collects GPlus Data on behalf of providers. - */ -public abstract class GPlusDataCollector implements Runnable { - - private static final Logger LOGGER = LoggerFactory.getLogger(GPlusDataCollector.class); - - /** - * Looks at the status code of the exception. If the code indicates that the request should be retried, - * it executes the back off strategy and returns true. - * @param gjre GoogleJsonResponseException - * @param backOff BackOffStrategy - * @return returns true if the error code of the exception indicates the request should be retried. - */ - public boolean backoffAndIdentifyIfRetry(GoogleJsonResponseException gjre, BackOffStrategy backOff) throws BackOffException { - boolean tryAgain = false; - switch (gjre.getStatusCode()) { - case 400 : - LOGGER.warn("Bad Request : {}", gjre); - break; - case 401 : - LOGGER.warn("Invalid Credentials : {}", gjre); - break; - case 403 : - LOGGER.warn("Possible rate limit exception. Retrying. : {}", gjre.getMessage()); - backOff.backOff(); - tryAgain = true; - break; - case 503 : - LOGGER.warn("Google Backend Service Error : {}", gjre); - break; - default: - LOGGER.warn("Google Service returned error : {}", gjre); - tryAgain = true; - backOff.backOff(); - break; - } - return tryAgain; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java deleted file mode 100644 index 31d8b5c..0000000 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gplus.provider; - -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.google.gplus.configuration.UserInfo; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.util.api.requests.backoff.BackOffStrategy; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.fasterxml.jackson.databind.ser.std.StdSerializer; -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.services.plus.Plus; -import com.google.api.services.plus.model.Activity; -import com.google.api.services.plus.model.ActivityFeed; -import com.google.gplus.serializer.util.GPlusActivityDeserializer; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.concurrent.BlockingQueue; - -/** - * Collects the public activities of a GPlus user. Has ability to filter by date ranges. - */ -public class GPlusUserActivityCollector extends GPlusDataCollector { - - /** - * Key for all public activities - * https://developers.google.com/+/api/latest/activities/list - */ - private static final String PUBLIC_COLLECTION = "public"; - /** - * Max results allowed per request - * https://developers.google.com/+/api/latest/activities/list - */ - private static final long MAX_RESULTS = 100; - private static final int MAX_ATTEMPTS = 5; - private static final Logger LOGGER = LoggerFactory.getLogger(GPlusUserActivityCollector.class); - private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - - static { //set up mapper for Google Activity Object - SimpleModule simpleModule = new SimpleModule(); - simpleModule.addDeserializer(Activity.class, new GPlusActivityDeserializer()); - simpleModule.addSerializer( - com.google.api.client.util.DateTime.class, - new StdSerializer(com.google.api.client.util.DateTime.class) { - @Override - public void serialize( - com.google.api.client.util.DateTime dateTime, - JsonGenerator jsonGenerator, - SerializerProvider serializerProvider) - throws IOException { - jsonGenerator.writeString(dateTime.toStringRfc3339()); - } - }); - MAPPER.registerModule(simpleModule); - MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } - - private BlockingQueue datumQueue; - private BackOffStrategy backOff; - private Plus plus; - private UserInfo userInfo; - - /** - * GPlusUserActivityCollector constructor. - * @param plus Plus - * @param datumQueue BlockingQueue - * @param backOff BackOffStrategy - * @param userInfo UserInfo - */ - public GPlusUserActivityCollector(Plus plus, BlockingQueue datumQueue, BackOffStrategy backOff, UserInfo userInfo) { - this.plus = plus; - this.datumQueue = datumQueue; - this.backOff = backOff; - this.userInfo = userInfo; - } - - @Override - public void run() { - collectActivityData(); - } - - protected void collectActivityData() { - try { - ActivityFeed feed = null; - boolean tryAgain = false; - int attempt = 0; - DateTime afterDate = userInfo.getAfterDate(); - DateTime beforeDate = userInfo.getBeforeDate(); - do { - try { - if (feed == null) { - feed = this.plus.activities().list(this.userInfo.getUserId(), PUBLIC_COLLECTION) - .setMaxResults(MAX_RESULTS).execute(); - } else { - feed = this.plus.activities().list(this.userInfo.getUserId(), PUBLIC_COLLECTION) - .setMaxResults(MAX_RESULTS) - .setPageToken(feed.getNextPageToken()).execute(); - } - this.backOff.reset(); //successful pull reset api. - for (com.google.api.services.plus.model.Activity activity : feed.getItems()) { - DateTime published = new DateTime(activity.getPublished().getValue()); - if ((afterDate == null && beforeDate == null) - || (beforeDate == null && afterDate.isBefore(published)) - || (afterDate == null && beforeDate.isAfter(published)) - || ((afterDate != null && beforeDate != null) && (afterDate.isBefore(published) && beforeDate.isAfter(published)))) { - String json = MAPPER.writeValueAsString(activity); - this.datumQueue.put(new StreamsDatum(json, activity.getId())); - } else if (afterDate != null && afterDate.isAfter(published)) { - feed.setNextPageToken(null); // do not fetch next page - break; - } - } - } catch (GoogleJsonResponseException gjre) { - tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOff); - ++attempt; - } - } - while ((tryAgain || (feed != null && feed.getNextPageToken() != null)) && attempt < MAX_ATTEMPTS); - } catch (Throwable th) { - if (th instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - th.printStackTrace(); - LOGGER.warn("Unable to pull Activities for user={} : {}",this.userInfo.getUserId(), th); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java deleted file mode 100644 index 97b08fd..0000000 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gplus.provider; - -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.google.gplus.GPlusConfiguration; -import org.apache.streams.google.gplus.configuration.UserInfo; -import org.apache.streams.util.api.requests.backoff.BackOffStrategy; - -import com.google.api.services.plus.Plus; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Uninterruptibles; -import com.google.gson.Gson; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; - -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.PrintStream; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - * Retrieve recent activity from a list of accounts. - * - *

- * To use from command line: - * - *

- * Supply (at least) the following required configuration in application.conf: - * - *

- * gplus.oauth.pathToP12KeyFile - * gplus.oauth.serviceAccountEmailAddress - * gplus.apiKey - * gplus.googlePlusUsers - * - *

- * Launch using: - * - *

- * mvn exec:java -Dexec.mainClass=com.google.gplus.provider.GPlusUserActivityProvider -Dexec.args="application.conf activity.json" - */ -public class GPlusUserActivityProvider extends AbstractGPlusProvider { - - private static final String STREAMS_ID = "GPlusUserActivityProvider"; - - public GPlusUserActivityProvider() { - super(); - } - - public GPlusUserActivityProvider(GPlusConfiguration config) { - super(config); - } - - @Override - public String getId() { - return STREAMS_ID; - } - - @Override - protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue queue, Plus plus, UserInfo userInfo) { - return new GPlusUserActivityCollector(plus, queue, strategy, userInfo); - } - - /** - * Retrieve recent activity from a list of accounts. - * @param args args - * @throws Exception Exception - */ - public static void main(String[] args) throws Exception { - - Preconditions.checkArgument(args.length >= 2); - - String configfile = args[0]; - String outfile = args[1]; - - Config reference = ConfigFactory.load(); - File file = new File(configfile); - assert (file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)); - - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); - GPlusConfiguration config = new ComponentConfigurator<>(GPlusConfiguration.class).detectConfiguration(typesafe, "gplus"); - GPlusUserActivityProvider provider = new GPlusUserActivityProvider(config); - - Gson gson = new Gson(); - - PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); - provider.prepare(config); - provider.startStream(); - do { - Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - for (StreamsDatum datum : provider.readCurrent()) { - String json; - if (datum.getDocument() instanceof String) { - json = (String) datum.getDocument(); - } else { - json = gson.toJson(datum.getDocument()); - } - outStream.println(json); - } - } - while ( provider.isRunning()); - provider.cleanUp(); - outStream.flush(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java deleted file mode 100644 index e0df58d..0000000 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gplus.provider; - -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.google.gplus.configuration.UserInfo; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.util.api.requests.backoff.BackOffStrategy; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.services.plus.Plus; -import com.google.api.services.plus.model.Person; -import com.google.gplus.serializer.util.GPlusPersonDeserializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.BlockingQueue; - -/** - * Collects user profile information for a specific GPlus user. - */ -public class GPlusUserDataCollector extends GPlusDataCollector { - - private static final Logger LOGGER = LoggerFactory.getLogger(GPlusUserDataCollector.class); - private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - private static final int MAX_ATTEMPTS = 5; - - static { //set up Mapper for Person objects - SimpleModule simpleModule = new SimpleModule(); - simpleModule.addDeserializer(Person.class, new GPlusPersonDeserializer()); - MAPPER.registerModule(simpleModule); - MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } - - private BackOffStrategy backOffStrategy; - private Plus plus; - private BlockingQueue datumQueue; - private UserInfo userInfo; - - /** - * GPlusUserDataCollector constructor. - * @param plus Plus - * @param backOffStrategy BackOffStrategy - * @param datumQueue BlockingQueue of StreamsDatum - * @param userInfo UserInfo - */ - public GPlusUserDataCollector(Plus plus, BackOffStrategy backOffStrategy, BlockingQueue datumQueue, UserInfo userInfo) { - this.plus = plus; - this.backOffStrategy = backOffStrategy; - this.datumQueue = datumQueue; - this.userInfo = userInfo; - } - - protected void queueUserHistory() { - try { - boolean tryAgain; - int attempts = 0; - com.google.api.services.plus.model.Person person = null; - do { - try { - person = this.plus.people().get(userInfo.getUserId()).execute(); - this.backOffStrategy.reset(); - tryAgain = person == null; - } catch (GoogleJsonResponseException gjre) { - tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOffStrategy); - } - ++attempts; - } - while (tryAgain && attempts < MAX_ATTEMPTS); - String json = MAPPER.writeValueAsString(person); - this.datumQueue.put(new StreamsDatum(json, person.getId())); - } catch (Throwable throwable) { - LOGGER.warn("Unable to pull user data for user={} : {}", userInfo.getUserId(), throwable); - if (throwable instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - } - } - - @Override - public void run() { - queueUserHistory(); - } - - - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java deleted file mode 100644 index 28bcb55..0000000 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gplus.provider; - -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.google.gplus.GPlusConfiguration; -import org.apache.streams.google.gplus.configuration.UserInfo; -import org.apache.streams.util.api.requests.backoff.BackOffStrategy; - -import com.google.api.services.plus.Plus; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Uninterruptibles; -import com.google.gson.Gson; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; - -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.PrintStream; -import java.util.Iterator; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - * Retrieve current profile status for a list of accounts. - * - *

- * To use from command line: - * - *

- * Supply (at least) the following required configuration in application.conf: - * - *

- * gplus.oauth.pathToP12KeyFile - * gplus.oauth.serviceAccountEmailAddress - * gplus.apiKey - * gplus.googlePlusUsers - * - *

- * Launch using: - * - *

- * mvn exec:java -Dexec.mainClass=com.google.gplus.provider.GPlusUserDataProvider -Dexec.args="application.conf profiles.json" - */ -public class GPlusUserDataProvider extends AbstractGPlusProvider { - - public static final String STREAMS_ID = "GPlusUserDataProvider"; - - public GPlusUserDataProvider() { - super(); - } - - public GPlusUserDataProvider(GPlusConfiguration config) { - super(config); - } - - @Override - public String getId() { - return STREAMS_ID; - } - - @Override - protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue queue, Plus plus, UserInfo userInfo) { - return new GPlusUserDataCollector(plus, strategy, queue, userInfo); - } - - /** - * Retrieve current profile status for a list of accounts. - * @param args args - * @throws Exception Exception - */ - public static void main(String[] args) throws Exception { - - Preconditions.checkArgument(args.length >= 2); - - String configfile = args[0]; - String outfile = args[1]; - - Config reference = ConfigFactory.load(); - File file = new File(configfile); - assert (file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)); - - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); - GPlusConfiguration config = new ComponentConfigurator<>(GPlusConfiguration.class).detectConfiguration(typesafe, "gplus"); - GPlusUserDataProvider provider = new GPlusUserDataProvider(config); - - Gson gson = new Gson(); - - PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); - provider.prepare(config); - provider.startStream(); - do { - Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - Iterator iterator = provider.readCurrent().iterator(); - while (iterator.hasNext()) { - StreamsDatum datum = iterator.next(); - String json; - if (datum.getDocument() instanceof String) { - json = (String) datum.getDocument(); - } else { - json = gson.toJson(datum.getDocument()); - } - outStream.println(json); - } - } - while ( provider.isRunning()); - provider.cleanUp(); - outStream.flush(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusActivityDeserializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusActivityDeserializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusActivityDeserializer.java deleted file mode 100644 index f13267a..0000000 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusActivityDeserializer.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gplus.serializer.util; - -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonNode; -import com.google.api.client.util.DateTime; -import com.google.api.services.plus.model.Activity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -/** - * Custom deserializer for GooglePlus' Person model. - */ -public class GPlusActivityDeserializer extends JsonDeserializer { - - private static final Logger LOGGER = LoggerFactory.getLogger(GPlusActivityDeserializer.class); - - /** - * Because the GooglePlus Activity object {@link com.google.api.services.plus.model.Activity} contains complex objects - * within its hierarchy, we have to use a custom deserializer - * - * @param jsonParser jsonParser - * @param deserializationContext deserializationContext - * @return The deserialized {@link com.google.api.services.plus.model.Activity} object - * @throws IOException IOException - * @throws JsonProcessingException JsonProcessingException - */ - @Override - public Activity deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException { - - JsonNode node = jsonParser.getCodec().readTree(jsonParser); - Activity activity = new Activity(); - - try { - activity.setUrl(node.get("url").asText()); - activity.setEtag(node.get("etag").asText()); - activity.setTitle(node.get("title").asText()); - activity.setPublished(DateTime.parseRfc3339(node.get("published").asText())); - activity.setUpdated(DateTime.parseRfc3339(node.get("updated").asText())); - activity.setId(node.get("id").asText()); - activity.setVerb(node.get("verb").asText()); - - activity.setActor(buildActor(node)); - - activity.setObject(buildPlusObject(node)); - } catch (Exception ex) { - LOGGER.error("Exception while trying to deserialize activity object: {}", ex); - } - - return activity; - } - - /** - * Given a raw JsonNode, build out the G+ {@link com.google.api.services.plus.model.Activity.Actor} object - * - * @param node node - * @return {@link com.google.api.services.plus.model.Activity.Actor} object - */ - private Activity.Actor buildActor(JsonNode node) { - Activity.Actor actor = new Activity.Actor(); - JsonNode actorNode = node.get("actor"); - - actor.setId(actorNode.get("id").asText()); - actor.setDisplayName(actorNode.get("displayName").asText()); - actor.setUrl(actorNode.get("url").asText()); - - Activity.Actor.Image image = new Activity.Actor.Image(); - JsonNode imageNode = actorNode.get("image"); - image.setUrl(imageNode.get("url").asText()); - - actor.setImage(image); - - return actor; - } - - /** - * Given a JsonNode, build out all aspects of the {@link com.google.api.services.plus.model.Activity.PlusObject} object - * - * @param node node - * @return {@link com.google.api.services.plus.model.Activity.PlusObject} object - */ - private Activity.PlusObject buildPlusObject(JsonNode node) { - Activity.PlusObject object = new Activity.PlusObject(); - JsonNode objectNode = node.get("object"); - object.setObjectType(objectNode.get("objectType").asText()); - object.setContent(objectNode.get("content").asText()); - object.setUrl(objectNode.get("url").asText()); - - Activity.PlusObject.Replies replies = new Activity.PlusObject.Replies(); - JsonNode repliesNode = objectNode.get("replies"); - replies.setTotalItems(repliesNode.get("totalItems").asLong()); - replies.setSelfLink(repliesNode.get("selfLink").asText()); - object.setReplies(replies); - - Activity.PlusObject.Plusoners plusoners = new Activity.PlusObject.Plusoners(); - JsonNode plusonersNode = objectNode.get("plusoners"); - plusoners.setTotalItems(plusonersNode.get("totalItems").asLong()); - plusoners.setSelfLink(plusonersNode.get("selfLink").asText()); - object.setPlusoners(plusoners); - - Activity.PlusObject.Resharers resharers = new Activity.PlusObject.Resharers(); - JsonNode resharersNode = objectNode.get("resharers"); - resharers.setTotalItems(resharersNode.get("totalItems").asLong()); - resharers.setSelfLink(resharersNode.get("selfLink").asText()); - object.setResharers(resharers); - - object.setAttachments(buildAttachments(objectNode));//attachments); - - return object; - } - - /** - * Given a raw JsonNode representation of an Activity's attachments, build out that - * list of {@link com.google.api.services.plus.model.Activity.PlusObject.Attachments} objects - * - * @param objectNode objectNode - * @return list of {@link com.google.api.services.plus.model.Activity.PlusObject.Attachments} objects - */ - private List buildAttachments(JsonNode objectNode) { - List attachments = new ArrayList<>(); - if ( objectNode.has("attachments") ) { - for (JsonNode attachmentNode : objectNode.get("attachments")) { - Activity.PlusObject.Attachments attachments1 = new Activity.PlusObject.Attachments(); - attachments1.setObjectType(attachmentNode.get("objectType").asText()); - if (attachmentNode.has("displayName")) { - attachments1.setDisplayName(attachmentNode.get("displayName").asText()); - } - if (attachmentNode.has("content")) { - attachments1.setContent(attachmentNode.get("content").asText()); - } - if (attachmentNode.has("url")) { - attachments1.setUrl(attachmentNode.get("url").asText()); - } - - if( attachmentNode.has("image")) { - Activity.PlusObject.Attachments.Image image1 = new Activity.PlusObject.Attachments.Image(); - JsonNode imageNode1 = attachmentNode.get("image"); - image1.setUrl(imageNode1.get("url").asText()); - attachments1.setImage(image1); - } - - attachments.add(attachments1); - } - } - return attachments; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusCommentDeserializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusCommentDeserializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusCommentDeserializer.java deleted file mode 100644 index 5b92c43..0000000 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusCommentDeserializer.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gplus.serializer.util; - -import org.apache.streams.jackson.StreamsJacksonMapper; - -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.client.util.DateTime; -import com.google.api.services.plus.model.Comment; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -/** - * GPlusCommentDeserializer converts gplus comments to as1 comments. - */ -public class GPlusCommentDeserializer extends JsonDeserializer { - - private static final Logger LOGGER = LoggerFactory.getLogger(GPlusActivityDeserializer.class); - - /** - * Because the GooglePlus Comment object {@link com.google.api.services.plus.model.Comment} contains complex objects - * within its hierarchy, we have to use a custom deserializer - * - * @param jsonParser jsonParser - * @param deserializationContext deserializationContext - * @return The deserialized {@link com.google.api.services.plus.model.Comment} object - * @throws java.io.IOException IOException - * @throws com.fasterxml.jackson.core.JsonProcessingException JsonProcessingException - */ - @Override - public Comment deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) - throws IOException, JsonProcessingException { - - JsonNode node = jsonParser.getCodec().readTree(jsonParser); - ObjectMapper objectMapper = StreamsJacksonMapper.getInstance(); - Comment comment = new Comment(); - - try { - comment.setEtag(node.get("etag").asText()); - comment.setVerb(node.get("verb").asText()); - comment.setId(node.get("id").asText()); - comment.setPublished(DateTime.parseRfc3339(node.get("published").asText())); - comment.setUpdated(DateTime.parseRfc3339(node.get("updated").asText())); - - Comment.Actor actor = new Comment.Actor(); - JsonNode actorNode = node.get("actor"); - actor.setDisplayName(actorNode.get("displayName").asText()); - actor.setUrl(actorNode.get("url").asText()); - - Comment.Actor.Image image = new Comment.Actor.Image(); - JsonNode imageNode = actorNode.get("image"); - image.setUrl(imageNode.get("url").asText()); - - actor.setImage(image); - - comment.setObject(objectMapper.readValue(objectMapper.writeValueAsString(node.get("object")), Comment.PlusObject.class)); - - comment.setSelfLink(node.get("selfLink").asText()); - - List replies = new ArrayList<>(); - for (JsonNode reply : node.get("inReplyTo")) { - Comment.InReplyTo irt = objectMapper.readValue(objectMapper.writeValueAsString(reply), Comment.InReplyTo.class); - replies.add(irt); - } - - comment.setInReplyTo(replies); - - Comment.Plusoners plusoners = new Comment.Plusoners(); - JsonNode plusonersNode = node.get("plusoners"); - plusoners.setTotalItems(plusonersNode.get("totalItems").asLong()); - comment.setPlusoners(plusoners); - } catch (Exception ex) { - LOGGER.error("Exception while trying to deserialize activity object: {}", ex); - } - - return comment; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusEventClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusEventClassifier.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusEventClassifier.java deleted file mode 100644 index 4923d3e..0000000 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusEventClassifier.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gplus.serializer.util; - -import org.apache.streams.jackson.StreamsJacksonMapper; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.api.services.plus.model.Activity; -import com.google.api.services.plus.model.Person; -import com.google.common.base.Preconditions; -import org.apache.commons.lang.StringUtils; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Objects; - -/** - * GPlusEventClassifier classifies GPlus Events. - */ -public class GPlusEventClassifier implements Serializable { - - private static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - private static final String ACTIVITY_IDENTIFIER = "\"plus#activity\""; - private static final String PERSON_IDENTIFIER = "\"plus#person\""; - - /** - * Detect likely class of String json. - * @param json String json - * @return likely class - */ - public static Class detectClass(String json) { - Objects.requireNonNull(json); - Preconditions.checkArgument(StringUtils.isNotEmpty(json)); - - ObjectNode objectNode; - try { - objectNode = (ObjectNode) mapper.readTree(json); - } catch (IOException ex) { - ex.printStackTrace(); - return null; - } - - if (objectNode.findValue("kind") != null && objectNode.get("kind").toString().equals(ACTIVITY_IDENTIFIER)) { - return Activity.class; - } else if (objectNode.findValue("kind") != null && objectNode.get("kind").toString().equals(PERSON_IDENTIFIER)) { - return Person.class; - } else { - return ObjectNode.class; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusPersonDeserializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusPersonDeserializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusPersonDeserializer.java deleted file mode 100644 index 38bb168..0000000 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusPersonDeserializer.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gplus.serializer.util; - -import org.apache.streams.jackson.StreamsJacksonMapper; - -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.services.plus.model.Person; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -/** - * Custom deserializer for GooglePlus' Person model. - */ -public class GPlusPersonDeserializer extends JsonDeserializer { - - private static final Logger LOGGER = LoggerFactory.getLogger(GPlusPersonDeserializer.class); - - /** - * Because the GooglePlus Person object contains complex objects within its hierarchy, we have to use - * a custom deserializer - * - * @param jsonParser jsonParser - * @param deserializationContext deserializationContext - * @return The deserialized {@link com.google.api.services.plus.model.Person} object - * @throws IOException IOException - * @throws JsonProcessingException JsonProcessingException - */ - @Override - public Person deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException { - ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - JsonNode node = jsonParser.getCodec().readTree(jsonParser); - Person person = new Person(); - try { - person.setId(node.get("id").asText()); - person.setCircledByCount((Integer) (node.get("circledByCount")).numberValue()); - person.setDisplayName(node.get("displayName").asText()); - if( node.has("etag")) { - person.setEtag(node.get("etag").asText()); - } - if( node.has("gender")) { - person.setGender(node.get("gender").asText()); - } - - Person.Image image = new Person.Image(); - if( node.has("image") ) { - JsonNode imageNode = node.get("image"); - image.setIsDefault(imageNode.get("isDefault").asBoolean()); - image.setUrl(imageNode.get("url").asText()); - person.setImage(image); - } - - person.setIsPlusUser(node.get("isPlusUser").asBoolean()); - person.setKind(node.get("kind").asText()); - - JsonNode nameNode = node.get("name"); - Person.Name name = mapper.readValue(mapper.writeValueAsString(nameNode), Person.Name.class); - person.setName(name); - - person.setObjectType(node.get("objectType").asText()); - - List organizations = new ArrayList<>(); - if( node.has("organizations")) { - for (JsonNode orgNode : node.get("organizations")) { - Person.Organizations org = mapper.readValue(mapper.writeValueAsString(orgNode), Person.Organizations.class); - organizations.add(org); - } - person.setOrganizations(organizations); - } - - person.setUrl(node.get("url").asText()); - person.setVerified(node.get("verified").asBoolean()); - - List emails = new ArrayList<>(); - - if ( node.has("emails")) { - for (JsonNode emailNode : node.get("emails")) { - Person.Emails email = mapper.readValue(mapper.writeValueAsString(emailNode), Person.Emails.class); - emails.add(email); - } - } - - if ( node.has("tagline")) { - person.setTagline(node.get("tagline").asText()); - } - if ( node.has("aboutMe")) { - person.setAboutMe(node.get("aboutMe").asText()); - } - } catch (Exception ex) { - LOGGER.error("Exception while trying to deserialize a Person object: {}", ex); - } - - return person; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GooglePlusActivityUtil.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GooglePlusActivityUtil.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GooglePlusActivityUtil.java deleted file mode 100644 index 27b25a2..0000000 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GooglePlusActivityUtil.java +++ /dev/null @@ -1,298 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gplus.serializer.util; - -import org.apache.streams.exceptions.ActivitySerializerException; -import org.apache.streams.pojo.extensions.ExtensionUtil; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.pojo.json.ActivityObject; -import org.apache.streams.pojo.json.Image; -import org.apache.streams.pojo.json.Provider; - -import com.google.api.services.plus.model.Comment; -import com.google.api.services.plus.model.Person; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** - * GooglePlusActivityUtil helps convert c.g.Person and c.g.Activity into o.a.s.p.j.o.Page and o.a.s.p.j.Activity. - */ -public class GooglePlusActivityUtil { - - private static final Logger LOGGER = LoggerFactory.getLogger(GooglePlusActivityUtil.class); - - /** - * Given a {@link Person} object and an - * {@link Activity} object, fill out the appropriate details. - * - * @param item Person - * @param activity Activity - * @throws ActivitySerializerException ActivitySerializerException - */ - public static void updateActivity(Person item, Activity activity) throws ActivitySerializerException { - activity.setActor(buildActor(item)); - activity.setVerb("update"); - - activity.setId(formatId(activity.getVerb(), Optional.ofNullable(item.getId()).orElse(null))); - - activity.setProvider(getProvider()); - } - - /** - * Given a {@link List} of {@link Comment} objects and an - * {@link Activity}, update that Activity to contain all comments - * - * @param comments input List of Comment - * @param activity output Activity - */ - public static void updateActivity(List comments, Activity activity) { - for (Comment comment : comments) { - addComment(activity, comment); - } - - Map extensions = ExtensionUtil.getInstance().ensureExtensions(activity); - extensions.put("comment_count", comments.size()); - } - - /** - * Given a Google Plus {@link com.google.api.services.plus.model.Activity}, - * convert that into an Activity streams formatted {@link Activity} - * - * @param gPlusActivity input c.g.a.s.p.m.Activity - * @param activity output o.a.s.p.j.Activity - */ - public static void updateActivity(com.google.api.services.plus.model.Activity gPlusActivity, Activity activity) { - activity.setActor(buildActor(gPlusActivity.getActor())); - activity.setVerb("post"); - activity.setTitle(gPlusActivity.getTitle()); - activity.setUrl(gPlusActivity.getUrl()); - activity.setProvider(getProvider()); - - if (gPlusActivity.getObject() != null) { - activity.setContent(gPlusActivity.getObject().getContent()); - } - - activity.setId(formatId(activity.getVerb(), Optional.ofNullable(gPlusActivity.getId()).orElse(null))); - - DateTime published = new DateTime(String.valueOf(gPlusActivity.getPublished())); - activity.setPublished(published); - - setObject(activity, gPlusActivity.getObject()); - addGPlusExtensions(activity, gPlusActivity); - } - - /** - * Adds a single {@link Comment} to the Object.Attachments - * section of the passed in {@link Activity} - * - * @param activity output o.a.s.p.j.Activity - * @param comment input c.g.a.s.p.m.Comment - */ - private static void addComment(Activity activity, Comment comment) { - ActivityObject obj = new ActivityObject(); - - obj.setId(comment.getId()); - obj.setPublished(new DateTime(String.valueOf(comment.getPublished()))); - obj.setUpdated(new DateTime(String.valueOf(comment.getUpdated()))); - obj.setContent(comment.getObject().getContent()); - obj.setObjectType(comment.getObject().getObjectType()); - - Map extensions = new HashMap<>(); - extensions.put("googlePlus", comment); - - obj.setAdditionalProperty("extensions", extensions); - - if (activity.getObject() == null) { - activity.setObject(new ActivityObject()); - } - if (activity.getObject().getAttachments() == null) { - activity.getObject().setAttachments(new ArrayList<>()); - } - - activity.getObject().getAttachments().add(obj); - } - - /** - * Add in necessary extensions from the passed in {@link com.google.api.services.plus.model.Activity} to the - * {@link Activity} object - * - * @param activity output o.a.s.p.j.Activity - * @param gPlusActivity input c.g.a.s.p.m.Activity - */ - private static void addGPlusExtensions(Activity activity, com.google.api.services.plus.model.Activity gPlusActivity) { - - activity.getAdditionalProperties().put("googlePlus", gPlusActivity); - - Map extensions = ExtensionUtil.getInstance().ensureExtensions(activity); - - com.google.api.services.plus.model.Activity.PlusObject object = gPlusActivity.getObject(); - - if (object != null) { - com.google.api.services.plus.model.Activity.PlusObject.Plusoners plusoners = object.getPlusoners(); - if (plusoners != null) { - Map likes = new HashMap<>(); - likes.put("count", plusoners.getTotalItems()); - extensions.put("likes", likes); - } - - com.google.api.services.plus.model.Activity.PlusObject.Resharers resharers = object.getResharers(); - if (resharers != null) { - Map rebroadcasts = new HashMap<>(); - rebroadcasts.put("count", resharers.getTotalItems()); - extensions.put("rebroadcasts", rebroadcasts); - } - - extensions.put("keywords", object.getContent()); - } - } - - /** - * Set the {@link ActivityObject} field given the passed in - * {@link com.google.api.services.plus.model.Activity.PlusObject} - * - * @param activity output $.object as o.a.s.p.j.ActivityObject - * @param plusObject input c.g.a.s.p.m.Activity.PlusObject - */ - private static void setObject(Activity activity, com.google.api.services.plus.model.Activity.PlusObject plusObject) { - if (plusObject != null) { - ActivityObject activityObject = new ActivityObject(); - - activityObject.setContent(plusObject.getContent()); - activityObject.setObjectType(plusObject.getObjectType()); - - List attachmentsList = new ArrayList<>(); - for (com.google.api.services.plus.model.Activity.PlusObject.Attachments attachments : plusObject.getAttachments()) { - ActivityObject attach = new ActivityObject(); - - attach.setContent(attachments.getContent()); - attach.setDisplayName(attachments.getDisplayName()); - attach.setObjectType(attachments.getObjectType()); - attach.setUrl(attachments.getUrl()); - - Image image = new Image(); - com.google.api.services.plus.model.Activity.PlusObject.Attachments.Image image1 = attachments.getImage(); - - if (image1 != null) { - image.setUrl(image1.getUrl()); - attach.setImage(image); - } - - attachmentsList.add(attach); - } - - activityObject.setAttachments(attachmentsList); - - activity.setObject(activityObject); - } - } - - /** - * Given a {@link com.google.api.services.plus.model.Activity.Actor} object, return a fully fleshed - * out {@link ActivityObject} actor - * - * @param gPlusActor input c.g.a.s.p.m.Activity.Actor - * @return {@link ActivityObject} output $.actor as o.a.s.p.j.ActivityObject - */ - private static ActivityObject buildActor(com.google.api.services.plus.model.Activity.Actor gPlusActor) { - ActivityObject actor = new ActivityObject(); - - actor.setDisplayName(gPlusActor.getDisplayName()); - actor.setId(formatId(String.valueOf(gPlusActor.getId()))); - actor.setUrl(gPlusActor.getUrl()); - - Image image = new Image(); - com.google.api.services.plus.model.Activity.Actor.Image googlePlusImage = gPlusActor.getImage(); - - if (googlePlusImage != null) { - image.setUrl(googlePlusImage.getUrl()); - } - actor.setImage(image); - - return actor; - } - - /** - * Extract the relevant details from the passed in {@link Person} object and build - * an actor with them - * - * @param person Person - * @return Actor constructed with relevant Person details - */ - private static ActivityObject buildActor(Person person) { - ActivityObject actor = new ActivityObject(); - - actor.setUrl(person.getUrl()); - actor.setDisplayName(person.getDisplayName()); - actor.setId(formatId(String.valueOf(person.getId()))); - - if (person.getAboutMe() != null) { - actor.setSummary(person.getAboutMe()); - } else if (person.getTagline() != null) { - actor.setSummary(person.getTagline()); - } - - Image image = new Image(); - Person.Image googlePlusImage = person.getImage(); - - if (googlePlusImage != null) { - image.setUrl(googlePlusImage.getUrl()); - } - actor.setImage(image); - - Map extensions = new HashMap<>(); - - extensions.put("followers", person.getCircledByCount()); - extensions.put("googleplus", person); - actor.setAdditionalProperty("extensions", extensions); - - return actor; - } - - /** - * Gets the common googleplus {@link Provider} object - * @return a provider object representing GooglePlus - */ - public static Provider getProvider() { - Provider provider = new Provider(); - provider.setId("id:providers:googleplus"); - provider.setDisplayName("GooglePlus"); - return provider; - } - - /** - * Formats the ID to conform with the Apache Streams activity ID convention - * @param idparts the parts of the ID to join - * @return a valid Activity ID in format "id:googleplus:part1:part2:...partN" - */ - public static String formatId(String... idparts) { - return String.join(":", - Stream.concat(Arrays.stream(new String[]{"id:googleplus"}), Arrays.stream(idparts)).collect(Collectors.toList())); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/processor/GooglePlusCommentProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/processor/GooglePlusCommentProcessor.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/processor/GooglePlusCommentProcessor.java new file mode 100644 index 0000000..1128a93 --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/processor/GooglePlusCommentProcessor.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.gplus.processor; + +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.gplus.serializer.util.GooglePlusActivityUtil; +import org.apache.streams.pojo.json.Activity; + +import com.google.api.services.plus.model.Comment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * GooglePlusCommentProcessor collects comments about a google plus activity. + */ +public class GooglePlusCommentProcessor implements StreamsProcessor { + + private static final String STREAMS_ID = "GooglePlusCommentProcessor"; + private static final Logger LOGGER = LoggerFactory.getLogger(GooglePlusCommentProcessor.class); + private int count; + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public List process(StreamsDatum entry) { + StreamsDatum result = null; + + try { + Object item = entry.getDocument(); + LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); + + //Get G+ activity ID from our own activity ID + if (item instanceof Activity) { + Activity activity = (Activity) item; + String activityId = getGPlusID(activity.getId()); + + //Call Google Plus API to get list of comments for this activity ID + /* TODO: FILL ME OUT WITH THE API CALL **/ + List comments = new ArrayList<>(); + + GooglePlusActivityUtil.updateActivity(comments, activity); + result = new StreamsDatum(activity); + } + } catch (Exception ex) { + ex.printStackTrace(); + LOGGER.error("Exception while converting Comment to Activity: {}", ex.getMessage()); + } + + if ( result != null ) { + return Stream.of(result).collect(Collectors.toList()); + } else { + return new ArrayList<>(); + } + } + + @Override + public void prepare(Object configurationObject) { + count = 0; + } + + @Override + public void cleanUp() { + + } + + private String getGPlusID(String activityId) { + String[] activityParts = activityId.split(":"); + return (activityParts.length > 0) ? activityParts[activityParts.length - 1] : ""; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/processor/GooglePlusTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/processor/GooglePlusTypeConverter.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/processor/GooglePlusTypeConverter.java new file mode 100644 index 0000000..ff11b45 --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/processor/GooglePlusTypeConverter.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.gplus.processor; + +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.gplus.serializer.util.GPlusActivityDeserializer; +import org.apache.streams.gplus.serializer.util.GPlusEventClassifier; +import org.apache.streams.gplus.serializer.util.GPlusPersonDeserializer; +import org.apache.streams.gplus.serializer.util.GooglePlusActivityUtil; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; + +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.api.services.plus.model.Person; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * GooglePlusTypeConverter is a StreamsProcessor that converts gplus activities to activitystreams activities. + */ +public class GooglePlusTypeConverter implements StreamsProcessor { + + public static final String STREAMS_ID = "GooglePlusTypeConverter"; + + private static final Logger LOGGER = LoggerFactory.getLogger(GooglePlusTypeConverter.class); + private StreamsJacksonMapper mapper; + private Queue inQueue; + private Queue outQueue; + private int count = 0; + + public GooglePlusTypeConverter() {} + + public Queue getProcessorOutputQueue() { + return outQueue; + } + + public void setProcessorInputQueue(Queue inputQueue) { + inQueue = inputQueue; + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public List process(StreamsDatum entry) { + StreamsDatum result = null; + + try { + Object item = entry.getDocument(); + + LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); + Activity activity = null; + + if (item instanceof String) { + item = deserializeItem(item); + } + + if (item instanceof Person) { + activity = new Activity(); + GooglePlusActivityUtil.updateActivity((Person) item, activity); + } else if (item instanceof com.google.api.services.plus.model.Activity) { + activity = new Activity(); + GooglePlusActivityUtil.updateActivity((com.google.api.services.plus.model.Activity) item, activity); + } + + if (activity != null) { + result = new StreamsDatum(activity); + count++; + } + } catch (Exception ex) { + ex.printStackTrace(); + LOGGER.error("Exception while converting Person to Activity: {}", ex.getMessage()); + } + + if (result != null) { + return Stream.of(result).collect(Collectors.toList()); + } else { + return new ArrayList<>(); + } + } + + private Object deserializeItem(Object item) { + try { + Class klass = GPlusEventClassifier.detectClass((String) item); + + if (klass.equals(Person.class)) { + item = mapper.readValue((String) item, Person.class); + } else if (klass.equals(com.google.api.services.plus.model.Activity.class)) { + item = mapper.readValue((String) item, com.google.api.services.plus.model.Activity.class); + } + } catch (Exception ex) { + LOGGER.error("Exception while trying to deserializeItem: {}", ex); + } + + return item; + } + + @Override + public void prepare(Object configurationObject) { + mapper = StreamsJacksonMapper.getInstance(); + + SimpleModule simpleModule = new SimpleModule(); + simpleModule.addDeserializer(Person.class, new GPlusPersonDeserializer()); + mapper.registerModule(simpleModule); + + simpleModule = new SimpleModule(); + simpleModule.addDeserializer(com.google.api.services.plus.model.Activity.class, new GPlusActivityDeserializer()); + mapper.registerModule(simpleModule); + } + + @Override + public void cleanUp() { + //No-op + } +}