streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mfrank...@apache.org
Subject [2/6] git commit: Updated facebook provider to have page feed provider and fixed serializer for posts
Date Thu, 04 Sep 2014 19:42:04 GMT
Updated facebook provider to have page feed provider and fixed serializer for posts


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e7b90335
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e7b90335
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e7b90335

Branch: refs/heads/master
Commit: e7b90335de261f2f2537a3ae94095da06ce4b592
Parents: dca6475
Author: Ryan Ebanks <rebanks@Informations-MacBook-Pro-3.local>
Authored: Wed Aug 27 06:51:56 2014 -0500
Committer: Ryan Ebanks <rebanks@Informations-MacBook-Pro-3.local>
Committed: Wed Aug 27 06:51:56 2014 -0500

----------------------------------------------------------------------
 .../streams/console/ConsolePersistWriter.java   |   5 +-
 .../api/FacebookPostActivitySerializer.java     |  29 +++-
 .../processor/FacebookTypeConverter.java        |   5 +
 .../pagefeed/FacebookDataCollector.java         | 138 ++++++++++++++++++
 .../pagefeed/FacebookPageFeedDataCollector.java | 124 +++++++++++++++++
 .../pagefeed/FacebookPageFeedProvider.java      |  28 ++++
 .../provider/pagefeed/FacebookProvider.java     | 139 +++++++++++++++++++
 .../FacebookStreamsPostSerializer.java          |  60 ++++++++
 .../streams/facebook/FacebookConfiguration.json |  33 +++++
 .../provider/pagefeed/TestFacebookProvider.java |  94 +++++++++++++
 10 files changed, 648 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e7b90335/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
index 3140479..96d116f 100644
--- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
+++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,7 +36,7 @@ public class ConsolePersistWriter implements StreamsPersistWriter {
 
     protected volatile Queue<StreamsDatum> persistQueue;
 
-    private ObjectMapper mapper = new ObjectMapper();
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
     public ConsolePersistWriter() {
         this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
@@ -60,7 +61,7 @@ public class ConsolePersistWriter implements StreamsPersistWriter {
 
             String text = mapper.writeValueAsString(entry);
 
-            System.out.println(text);
+            System.out.println("\n"+text+"\n");
 //            LOGGER.info(text);
 
         } catch (JsonProcessingException e) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e7b90335/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
index f1687ea..aa718fb 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
@@ -19,9 +19,12 @@
 package org.apache.streams.facebook.api;
 
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.exceptions.ActivitySerializerException;
@@ -67,6 +70,7 @@ public class FacebookPostActivitySerializer implements ActivitySerializer<org.ap
 
     @Override
     public Activity deserialize(Post post) throws ActivitySerializerException {
+
         Activity activity = new Activity();
         activity.setPublished(post.getCreatedTime());
         activity.setUpdated(post.getUpdatedTime());
@@ -76,6 +80,16 @@ public class FacebookPostActivitySerializer implements ActivitySerializer<org.ap
         parseObject(activity, mapper.convertValue(post, ObjectNode.class));
         fixObjectId(activity);
         fixContentFromSummary(activity);
+        activity.setVerb("post");
+        List<String> links = Lists.newLinkedList();
+        links.add(post.getLink());
+        activity.setLinks(links);
+        ensureExtensions(activity).put("facebook", post);
+        if(post.getLikes() != null) {
+            Map<String, Object> likes = Maps.newHashMap();
+            likes.put("count", post.getLikes().size());
+            ensureExtensions(activity).put("likes", likes);
+        }
         return activity;
     }
 
@@ -108,7 +122,7 @@ public class FacebookPostActivitySerializer implements ActivitySerializer<org.ap
 
     private void setProvider(Activity activity) {
         Provider provider = new Provider();
-        provider.setId(getProviderId(PROVIDER_NAME));
+        provider.setId("id:provider:"+PROVIDER_NAME);
         provider.setDisplayName(PROVIDER_NAME);
         activity.setProvider(provider);
     }
@@ -180,7 +194,9 @@ public class FacebookPostActivitySerializer implements ActivitySerializer<org.ap
 
     private void addLikeExtension(Activity activity, JsonNode value) {
         Map<String, Object> extensions = ensureExtensions(activity);
-        extensions.put(LIKES_EXTENSION, value.asInt());
+        Map<String, Object> likes = Maps.newHashMap();
+        likes.put("count", value.asLong());
+        extensions.put(LIKES_EXTENSION, likes);
     }
 
     private void addLocationExtension(Activity activity, JsonNode value) {
@@ -213,7 +229,7 @@ public class FacebookPostActivitySerializer implements ActivitySerializer<org.ap
     }
 
     private void addId(Activity activity, JsonNode value) {
-        activity.setId(getActivityId(PROVIDER_NAME, value.asText()));
+        activity.setId("id:"+PROVIDER_NAME+":"+value.asText());
     }
 
     private void addObjectLink(Activity activity, JsonNode value) {
@@ -223,7 +239,10 @@ public class FacebookPostActivitySerializer implements ActivitySerializer<org.ap
     private void addRebroadcastExtension(Activity activity, JsonNode value) {
         Map<String, Object> extensions = ensureExtensions(activity);
         if(value.has("count")) {
-            extensions.put(REBROADCAST_EXTENSION, value.get("count").asInt());
+            Map<String, Object> rebroadCast = Maps.newHashMap();
+            rebroadCast.put("count", value.get("count").asLong());
+            rebroadCast.put("perspectival", true);
+            extensions.put(REBROADCAST_EXTENSION, rebroadCast);
         }
     }
 
@@ -245,7 +264,7 @@ public class FacebookPostActivitySerializer implements ActivitySerializer<org.ap
             actor.setDisplayName(value.get("name").asText());
         }
         if(value.has("id")) {
-            actor.setId(getPersonId(PROVIDER_NAME, value.get("id").asText()));
+            actor.setId("id:"+PROVIDER_NAME+":"+value.get("id").asText());
         }
         activity.setActor(actor);
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e7b90335/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
index 7eb0e65..e160c77 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
@@ -175,6 +175,11 @@ public class FacebookTypeConverter implements StreamsProcessor {
 
                 if( out != null && validate(out, outClass))
                     result = new StreamsDatum(out);
+            } else if(item instanceof Post) {
+                Object out = convert(mapper.convertValue(item, ObjectNode.class), inClass,
outClass);
+
+                if( out != null && validate(out, outClass))
+                    result = new StreamsDatum(out);
             }
         } catch (Exception e) {
             e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e7b90335/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookDataCollector.java
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookDataCollector.java
new file mode 100644
index 0000000..196eaca
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookDataCollector.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.facebook.provider.pagefeed;
+
+import com.google.common.annotations.VisibleForTesting;
+import facebook4j.Facebook;
+import facebook4j.FacebookFactory;
+import facebook4j.auth.AccessToken;
+import facebook4j.conf.ConfigurationBuilder;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.facebook.FacebookConfiguration;
+import org.apache.streams.facebook.IdConfig;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
+import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
+import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Abstract data collector for Facebook.  Iterates over ids and queues data to be output
+ * by a {@link org.apache.streams.core.StreamsProvider}
+ */
+public abstract class FacebookDataCollector implements Runnable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookDataCollector.class);
+    private static final String READ_ONLY = "read_streams";
+
+    @VisibleForTesting
+    protected AtomicBoolean isComplete;
+    protected BackOffStrategy backOff;
+
+    private FacebookConfiguration config;
+    private BlockingQueue<StreamsDatum> queue;
+    private SimpleTokenManager<String> authTokens;
+
+
+    public FacebookDataCollector(FacebookConfiguration config, BlockingQueue<StreamsDatum>
queue) {
+        this.config = config;
+        this.queue = queue;
+        this.isComplete = new AtomicBoolean(false);
+        this.backOff = new ExponentialBackOffStrategy(5);
+        this.authTokens = new BasicTokenManger<String>();
+        if(config.getUserAccessTokens() != null) {
+            for(String token : config.getUserAccessTokens()) {
+                this.authTokens.addTokenToPool(token);
+            }
+        }
+    }
+
+    /**
+     * Returns true when the collector has finished querying facebook and has queued all
data
+     * for the provider
+     * @return
+     */
+    public boolean isComplete(){
+        return this.isComplete.get();
+    }
+
+    /**
+     * Queues facebook data
+     * @param data
+     * @param id
+     */
+    protected void outputData(Object data, String id) {
+        try {
+            this.queue.put(new StreamsDatum(data, id));
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * Gets a Facebook client.  If multiple authenticated users for this app are available
+     * it will rotate through the users oauth credentials
+     * @return
+     */
+    protected Facebook getNextFacebookClient() {
+            ConfigurationBuilder cb = new ConfigurationBuilder();
+            cb.setDebugEnabled(true)
+                    .setOAuthAppId(this.config.getOauth().getAppId())
+                    .setOAuthAppSecret(this.config.getOauth().getAppSecret());
+            if(this.authTokens.numAvailableTokens() > 0)
+                    cb.setOAuthAccessToken(this.authTokens.getNextAvailableToken());
+            else {
+                cb.setOAuthAccessToken(this.config.getOauth().getAppAccessToken());
+                LOGGER.debug("appAccessToken : {}", this.config.getOauth().getAppAccessToken());
+            }
+                    cb.setOAuthPermissions(READ_ONLY)
+                    .setJSONStoreEnabled(true);
+            LOGGER.debug("appId : {}", this.config.getOauth().getAppId());
+            LOGGER.debug("appSecret: {}", this.config.getOauth().getAppSecret());
+            FacebookFactory ff = new FacebookFactory(cb.build());
+            return  ff.getInstance();
+    }
+
+    /**
+     * Queries facebook and queues the resulting data
+     * @param id
+     * @throws Exception
+     */
+    protected abstract void getData(IdConfig id) throws Exception;
+
+
+    @Override
+    public void run() {
+        for( IdConfig id : this.config.getIds()) {
+            try {
+                getData(id);
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            } catch (Exception e) {
+                LOGGER.error("Caught Exception while trying to poll data for page : {}",
id);
+                LOGGER.error("Exception while getting page feed data: {}", e);
+            }
+        }
+        this.isComplete.set(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e7b90335/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java
new file mode 100644
index 0000000..184d273
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedDataCollector.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.facebook.provider.pagefeed;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import facebook4j.*;
+import facebook4j.json.DataObjectFactory;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.facebook.FacebookConfiguration;
+import org.apache.streams.facebook.IdConfig;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Collects the page feed data from public Facebook pages
+ */
+public class FacebookPageFeedDataCollector extends FacebookDataCollector {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageFeedDataCollector.class);
+    private static final int MAX_ATTEMPTS = 5;
+    private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+    private static final int LIMIT = 100;
+
+    public FacebookPageFeedDataCollector(BlockingQueue<StreamsDatum> queue, FacebookConfiguration
configuration) {
+        super(configuration, queue);
+    }
+
+    @Override
+    protected void getData(IdConfig id) throws Exception {
+        boolean exit = false;
+
+                ResponseList<Post> facebookPosts = getPosts(id.getId());
+                LOGGER.debug("Post received : {}", facebookPosts.size());
+                backOff.reset();
+                do {
+                    for(Post post : facebookPosts) {
+                        if(id.getBeforeDate() != null && id.getAfterDate() != null)
{
+                            if(id.getBeforeDate().isAfter(post.getCreatedTime().getTime())
+                                    && id.getAfterDate().isBefore(post.getCreatedTime().getTime()))
{
+                                super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post),
org.apache.streams.facebook.Post.class), post.getId());
+
+                            }
+                        } else if(id.getBeforeDate() != null && id.getBeforeDate().isAfter(post.getCreatedTime().getTime()))
{
+                            super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post),
org.apache.streams.facebook.Post.class), post.getId());
+                        } else if(id.getAfterDate() != null && id.getAfterDate().isBefore(post.getCreatedTime().getTime()))
{
+                            super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post),
org.apache.streams.facebook.Post.class), post.getId());
+                        } else if(id.getBeforeDate() == null && id.getAfterDate()
== null) {
+                            super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(post),
org.apache.streams.facebook.Post.class), post.getId());
+                        } else {
+                            exit = true;
+                            LOGGER.debug("Breaking on post, {}, with createdAtDate {}", post.getId(),
post.getCreatedTime());
+                            break;
+                        }
+                    }
+                    if(facebookPosts.getPaging() != null && !exit) {
+                        LOGGER.debug("Paging. . .");
+                        facebookPosts = getPosts(facebookPosts.getPaging());
+                        backOff.reset();
+                        LOGGER.debug("Paging received {} posts*", facebookPosts.size());
+                    } else {
+                        LOGGER.debug("No more paging.");
+                        facebookPosts = null;
+                    }
+                } while(facebookPosts != null && facebookPosts.size() != 0);
+
+
+    }
+
+    private ResponseList<Post> getPosts(Paging<Post> paging) throws Exception{
+        return getPosts(null, paging);
+    }
+
+    private ResponseList<Post> getPosts(String pageId) throws Exception {
+        return getPosts(pageId, null);
+    }
+
+    /**
+     * Queries facebook.  Attempts requests up to 5 times and backs off on each facebook
exception.
+     * @param pageId
+     * @param paging
+     * @return
+     * @throws Exception
+     */
+    private ResponseList<Post> getPosts(String pageId, Paging<Post> paging) throws
Exception {
+        int attempt = 0;
+        while(attempt < MAX_ATTEMPTS) {
+            ++attempt;
+            try {
+                if (pageId != null) {
+                    Reading reading = new Reading();
+                    reading.limit(LIMIT);
+                    return getNextFacebookClient().getPosts(pageId, reading);
+                }
+                else
+                    return getNextFacebookClient().fetchNext(paging);
+            } catch (FacebookException fe) {
+                LOGGER.error("Facebook returned an exception : {}", fe);
+                LOGGER.error("Facebook returned an exception while trying to get feed for
page, {} : {}", pageId, fe.getMessage());
+                // Rate limit exceptions with facebook4j unclear http://facebook4j.org/oldjavadocs/1.1.12-2.0.0/2.0.0/index.html?facebook4j/internal/http/HttpResponseCode.html
+                // back off at all exceptions until figured out.
+                super.backOff.backOff();
+            }
+        }
+        throw new Exception("Failed to get data from facebook after "+MAX_ATTEMPTS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e7b90335/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
new file mode 100644
index 0000000..35507b9
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.facebook.provider.pagefeed;
+
+/**
+ *
+ */
+public class FacebookPageFeedProvider extends FacebookProvider{
+    @Override
+    protected FacebookDataCollector getDataCollector() {
+        return new FacebookPageFeedDataCollector(super.datums, super.configuration);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e7b90335/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookProvider.java
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookProvider.java
new file mode 100644
index 0000000..47f798b
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookProvider.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.facebook.provider.pagefeed;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import com.sun.corba.se.spi.orb.DataCollector;
+import com.typesafe.config.ConfigRenderOptions;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.facebook.FacebookConfiguration;
+import org.apache.streams.facebook.IdConfig;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.ComponentUtils;
+import org.apache.streams.util.SerializationUtil;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Abstract {@link org.apache.streams.core.StreamsProvider} for facebook.
+ */
+public abstract class FacebookProvider implements StreamsProvider {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookProvider.class);
+    private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+    private static final int MAX_BATCH_SIZE = 2000;
+
+    protected FacebookConfiguration configuration;
+    protected BlockingQueue<StreamsDatum> datums;
+
+    private AtomicBoolean isComplete;
+    private ExecutorService executor;
+    private FacebookDataCollector dataCollector;
+
+
+    public FacebookProvider() {
+        try {
+            this.configuration = MAPPER.readValue(StreamsConfigurator.config.getConfig("facebook").root().render(ConfigRenderOptions.concise()),
FacebookConfiguration.class);
+        } catch (IOException ioe) {
+            LOGGER.error("Exception trying to read default config : {}", ioe);
+        }
+    }
+
+    public FacebookProvider(FacebookConfiguration configuration) {
+        this.configuration = (FacebookConfiguration) SerializationUtil.cloneBySerialization(configuration);
+    }
+
+
+    @Override
+    public void startStream() {
+        this.dataCollector = getDataCollector();
+        this.executor.submit(dataCollector);
+    }
+
+    protected abstract FacebookDataCollector getDataCollector();
+
+    @Override
+    public StreamsResultSet readCurrent() {
+        int batchSize = 0;
+        BlockingQueue<StreamsDatum> batch = Queues.newLinkedBlockingQueue();
+        while(!this.datums.isEmpty() && batchSize < MAX_BATCH_SIZE) {
+            ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.datums),
batch);
+            ++batchSize;
+        }
+        this.isComplete.set(batch.isEmpty() && this.datums.isEmpty() && this.dataCollector.isComplete());
+        return new StreamsResultSet(batch);
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return !this.isComplete.get();
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        this.datums = Queues.newLinkedBlockingQueue();
+        this.isComplete = new AtomicBoolean(false);
+        this.executor = Executors.newFixedThreadPool(1);
+    }
+
+    @Override
+    public void cleanUp() {
+        ComponentUtils.shutdownExecutor(executor, 5, 5);
+        executor = null;
+    }
+
+    /**
+     * Overrides the ids and addedAfter time in the configuration
+     * @param idsToAfterDate
+     */
+    public void overrideIds(Map<String, DateTime> idsToAfterDate) {
+        Set<IdConfig> ids = Sets.newHashSet();
+        for(String id : idsToAfterDate.keySet()) {
+            IdConfig idConfig = new IdConfig();
+            idConfig.setId(id);
+            idConfig.setAfterDate(idsToAfterDate.get(id));
+            ids.add(idConfig);
+        }
+        this.configuration.setIds(ids);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e7b90335/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/serializer/FacebookStreamsPostSerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/serializer/FacebookStreamsPostSerializer.java
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/serializer/FacebookStreamsPostSerializer.java
new file mode 100644
index 0000000..3c5cae6
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/serializer/FacebookStreamsPostSerializer.java
@@ -0,0 +1,60 @@
+package org.apache.streams.facebook.serializer;
+
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.facebook.Post;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.pojo.json.Provider;
+
+import java.util.List;
+
+/**
+ * Converts {@link org.apache.streams.facebook.Post} to {@link org.apache.streams.pojo.json.Activity}
+ */
+public class FacebookStreamsPostSerializer implements ActivitySerializer<Post> {
+
+    private static final String FACEBOOK_STREAMS_ID = "id:provider:facebook";
+    private static final String ID_PREFIX = "id:facebook:";
+    private static final String PROVIDER_DISPLAY = "Facebook";
+
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public Post serialize(Activity deserialized) throws ActivitySerializerException {
+        return null;
+    }
+
+    @Override
+    public Activity deserialize(Post post) throws ActivitySerializerException {
+        Activity activity = new Activity();
+        activity.setActor(createActor(post));
+
+        activity.setId(post.getId());
+        activity.setContent(post.getMessage());
+        return null;
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<Post> serializedList) {
+        return null;
+    }
+
+    public Actor createActor(Post post) {
+        Actor actor = new Actor();
+        actor.setDisplayName(post.getFrom().getName());
+        actor.setId(ID_PREFIX+post.getFrom().getId());
+        return actor;
+    }
+
+    public Provider createProvider(Post post) {
+        Provider provider = new Provider();
+        provider.setId(FACEBOOK_STREAMS_ID);
+        provider.setDisplayName(PROVIDER_DISPLAY);
+        return provider;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e7b90335/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookConfiguration.json
b/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookConfiguration.json
index b4e5afb..3356623 100644
--- a/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookConfiguration.json
+++ b/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookConfiguration.json
@@ -44,6 +44,39 @@
                     "type": "string"
                 }
             }
+        },
+        "userAccessTokens": {
+            "type": "array",
+            "uniqueItems": true,
+            "items": {
+                "type": "string",
+                "description": "User access tokens"
+            },
+            "description": "User access tokens that have been authorized for this app"
+        },
+        "ids": {
+            "type": "array",
+            "uniqueItems": true,
+            "items": {
+                "type": "object",
+                "description": "id for the application to use. user id, page id, etc. with
date ranges",
+                "javaType": "org.apache.streams.facebook.IdConfig",
+                "javaInterfaces": ["java.io.Serializable"],
+                "properties": {
+                    "id": {
+                        "type": "string"
+                    },
+                    "beforeDate": {
+                        "type": "string",
+                        "format": "date-time"
+                    },
+                    "afterDate": {
+                        "type": "string",
+                        "format": "date-time"
+                    }
+                }
+            },
+            "description": "ids for the application to use. user ids, page ids, etc."
         }
    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e7b90335/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/provider/pagefeed/TestFacebookProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/provider/pagefeed/TestFacebookProvider.java
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/provider/pagefeed/TestFacebookProvider.java
new file mode 100644
index 0000000..5b4aaa2
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/provider/pagefeed/TestFacebookProvider.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.facebook.provider.pagefeed;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.facebook.FacebookConfiguration;
+import org.apache.streams.facebook.IdConfig;
+import org.junit.Test;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit Tests For {@link org.apache.streams.facebook.provider.pagefeed.FacebookProvider}
+ */
+public class TestFacebookProvider {
+
+    @Test
+    public void testFacebookProvider() throws Exception {
+        //Test that streams starts and shut downs.
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+        FacebookProvider provider = new FacebookProvider(new FacebookConfiguration()) {
+            @Override
+            protected FacebookDataCollector getDataCollector() {
+                return new TestFacebookDataCollector(barrier, super.configuration, super.datums);
+            }
+        };
+        provider.prepare(null);
+        provider.startStream();
+        assertTrue(provider.isRunning());
+        barrier.await();
+        assertTrue(provider.isRunning());
+        assertEquals(5, provider.readCurrent().size());
+        barrier.await();
+        assertEquals(0, provider.readCurrent().size());
+        assertFalse(provider.isRunning());
+        provider.cleanUp();
+    }
+
+    private class TestFacebookDataCollector extends FacebookDataCollector {
+
+        private CyclicBarrier barrier;
+        private BlockingQueue<StreamsDatum> queue;
+
+        public TestFacebookDataCollector(CyclicBarrier barrier, FacebookConfiguration config,
BlockingQueue<StreamsDatum> queue) {
+            super(config, queue);
+            this.barrier = barrier;
+            this.queue = queue;
+
+        }
+
+        @Override
+        protected void getData(IdConfig id) throws Exception {
+
+        }
+
+        @Override
+        public void run() {
+            try {
+                for(int i=0; i < 5; ++i) {
+                    super.outputData(new Integer(i), ""+i);
+                }
+                this.barrier.await();
+                super.isComplete.set(true);
+                this.barrier.await();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            } catch (BrokenBarrierException bbe) {
+                fail();
+            }
+        }
+    }
+
+
+
+}


Mime
View raw message