http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java new file mode 100644 index 0000000..f1687ea --- /dev/null +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.facebook.api; + + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.commons.lang.NotImplementedException; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.exceptions.ActivitySerializerException; +import org.apache.streams.facebook.Post; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.*; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.apache.streams.data.util.ActivityUtil.*; + +/** + * Serializes activity posts + * sblackmon: This class needs a rewrite + */ +public class FacebookPostActivitySerializer implements ActivitySerializer { + + public static final DateTimeFormatter FACEBOOK_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ"); + public static final DateTimeFormatter ACTIVITY_FORMAT = ISODateTimeFormat.basicDateTime(); + + public static final String PROVIDER_NAME = "facebook"; + + public static ObjectMapper mapper; + static { + mapper = StreamsJacksonMapper.getInstance(); + } + + @Override + public String serializationFormat() { + return "facebook_post_json_v1"; + } + + @Override + public Post serialize(Activity deserialized) throws ActivitySerializerException { + throw new NotImplementedException("Not currently supported by this deserializer"); + } + + @Override + public Activity deserialize(Post post) throws ActivitySerializerException { + Activity activity = new Activity(); + activity.setPublished(post.getCreatedTime()); + activity.setUpdated(post.getUpdatedTime()); + addActor(activity, mapper.convertValue(post.getFrom(), ObjectNode.class)); + setProvider(activity); + setObjectType(post.getType(), activity); + parseObject(activity, mapper.convertValue(post, ObjectNode.class)); + fixObjectId(activity); + fixContentFromSummary(activity); + return activity; + } + + @Override + public List deserializeAll(List serializedList) { + throw new NotImplementedException("Not currently supported by this deserializer"); + } + + private void fixContentFromSummary(Activity activity) { + //we MUST have a content field set, so choose the best option + if(activity.getContent() == null) { + activity.setContent(activity.getAdditionalProperties().containsKey("summary") ? + (String) activity.getAdditionalProperties().get("summary") : + activity.getObject().getSummary()); + } + } + + private void fixObjectId(Activity activity) { + //An artifact of schema generation, the default value is {link} + if(activity.getObject().getId().equals("{link}")) { + activity.getObject().setId(null); + } + } + + private void setObjectType(String type, Activity activity) { + ActivityObject object = new ActivityObject(); + activity.setObject(object); + object.setObjectType(type); + } + + private void setProvider(Activity activity) { + Provider provider = new Provider(); + provider.setId(getProviderId(PROVIDER_NAME)); + provider.setDisplayName(PROVIDER_NAME); + activity.setProvider(provider); + } + + private String getObjectType(JsonNode node) { + Iterator> fields = node.fields(); + ensureMoreFields(fields); + Map.Entry field = fields.next(); + //ensureNoMoreFields(fields); + return node.asText(); + } + + private void parseObject(Activity activity, JsonNode jsonNode) throws ActivitySerializerException { + for(Iterator> fields = jsonNode.fields(); fields.hasNext();) { + Map.Entry field = fields.next(); + String key = field.getKey(); + JsonNode value = field.getValue(); + mapField(activity, key, value); + } + } + + private void mapField(Activity activity, String name, JsonNode value) throws ActivitySerializerException { + if("application".equals(name)) { + addGenerator(activity, value); + } else if ("caption".equals(name)) { + addSummary(activity, value); + } else if ("comments".equals(name)) { + addAttachments(activity, value); + } else if ("description".equals(name)) { + addObjectSummary(activity, value); + } else if ("from".equals(name)) { + addActor(activity, value); + } else if ("icon".equals(name)) { + addIcon(activity, value); + } else if ("id".equals(name)) { + addId(activity, value); + } else if ("is_hidden".equals(name)) { + addObjectHiddenExtension(activity, value); + } else if ("like_count".equals(name)) { + addLikeExtension(activity, value); + } else if ("link".equals(name)) { + addObjectLink(activity, value); + } else if ("message".equals(name)) { + activity.setContent(value.asText()); + } else if ("name".equals(name)) { + addObjectName(activity, value); + } else if ("object_id".equals(name)) { + addObjectId(activity, value); + } else if ("picture".equals(name)) { + addObjectImage(activity, value); + } else if ("place".equals(name)) { + addLocationExtension(activity, value); + } else if ("shares".equals(name)) { + addRebroadcastExtension(activity, value); + } else if ("source".equals(name)) { + addObjectLink(activity, value); + } else if ("story".equals(name)) { + addTitle(activity, value); + } + } + + private void addSummary(Activity activity, JsonNode value) { + activity.setAdditionalProperty("summary", value.asText()); + } + + private void addTitle(Activity activity, JsonNode value) { + activity.setTitle(value.asText()); + } + + private void addLikeExtension(Activity activity, JsonNode value) { + Map extensions = ensureExtensions(activity); + extensions.put(LIKES_EXTENSION, value.asInt()); + } + + private void addLocationExtension(Activity activity, JsonNode value) { + Map extensions = ensureExtensions(activity); + if(value.has("location")) { + Map location = new HashMap(); + JsonNode fbLocation = value.get("location"); + if(fbLocation.has("country")) { + location.put(LOCATION_EXTENSION_COUNTRY, fbLocation.get("country")); + } + if(fbLocation.has("latitude") && fbLocation.has("longitude")) { + location.put(LOCATION_EXTENSION_COORDINATES, String.format("%s,%s", fbLocation.get("longitude"), fbLocation.get("latitude"))); + } + extensions.put(LOCATION_EXTENSION, location); + } + } + + private void addObjectImage(Activity activity, JsonNode value) { + Image image = new Image(); + image.setUrl(value.asText()); + activity.getObject().setImage(image); + } + + private void addObjectId(Activity activity, JsonNode value) { + activity.getObject().setId(getObjectId("facebook", activity.getObject().getObjectType(), value.asText())); + } + + private void addObjectName(Activity activity, JsonNode value) { + activity.getObject().setDisplayName(value.asText()); + } + + private void addId(Activity activity, JsonNode value) { + activity.setId(getActivityId(PROVIDER_NAME, value.asText())); + } + + private void addObjectLink(Activity activity, JsonNode value) { + activity.getObject().setUrl(value.asText()); + } + + private void addRebroadcastExtension(Activity activity, JsonNode value) { + Map extensions = ensureExtensions(activity); + if(value.has("count")) { + extensions.put(REBROADCAST_EXTENSION, value.get("count").asInt()); + } + } + + private void addObjectHiddenExtension(Activity activity, JsonNode value) { + Map extensions = ensureExtensions(activity); + extensions.put("hidden", value.asBoolean()); + } + + private void addIcon(Activity activity, JsonNode value) { + Icon icon = new Icon(); + //Apparently the Icon didn't map from the schema very well + icon.setAdditionalProperty("url", value.asText()); + activity.setIcon(icon); + } + + private void addActor(Activity activity, JsonNode value) { + Actor actor = new Actor(); + if(value.has("name")) { + actor.setDisplayName(value.get("name").asText()); + } + if(value.has("id")) { + actor.setId(getPersonId(PROVIDER_NAME, value.get("id").asText())); + } + activity.setActor(actor); + } + + private void addObjectSummary(Activity activity, JsonNode value) { + activity.getObject().setSummary(value.asText()); + } + + private void addGenerator(Activity activity, JsonNode value) { + Generator generator = new Generator(); + if(value.has("id")) { + generator.setId(getObjectId(PROVIDER_NAME, "generator", value.get("id").asText())); + } + if(value.has("name")) { + generator.setDisplayName(value.get("name").asText()); + } + if(value.has("namespace")) { + generator.setSummary(value.get("namespace").asText()); + } + activity.setGenerator(generator); + } + + private void addAttachments(Activity activity, JsonNode value) { + //No direct mapping at this time + } + + private static void ensureMoreFields(Iterator> fields) { + if(!fields.hasNext()) { + throw new IllegalStateException("Facebook activity must have one and only one root element"); + } + } + private static void ensureNoMoreFields(Iterator> fields) { + if(fields.hasNext()) { + throw new IllegalStateException("Facebook activity must have one and only one root element"); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/feed/FacebookPublicFeedXmlActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/feed/FacebookPublicFeedXmlActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/feed/FacebookPublicFeedXmlActivitySerializer.java new file mode 100644 index 0000000..a44f982 --- /dev/null +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/feed/FacebookPublicFeedXmlActivitySerializer.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.facebook.feed; + +/** + * Created with IntelliJ IDEA. + * User: sblackmon + * Date: 10/2/13 + * Time: 6:32 PM + * To change this template use File | Settings | File Templates. + */ +public class FacebookPublicFeedXmlActivitySerializer { +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java new file mode 100644 index 0000000..2c6fd8e --- /dev/null +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.facebook.processor; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.exceptions.ActivitySerializerException; +import org.apache.streams.facebook.Post; +import org.apache.streams.facebook.api.FacebookPostActivitySerializer; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Queue; + +/** + * Created by sblackmon on 12/10/13. + */ +public class FacebookTypeConverter implements StreamsProcessor { + + public final static String STREAMS_ID = "TwitterTypeConverter"; + + private final static Logger LOGGER = LoggerFactory.getLogger(FacebookTypeConverter.class); + + private ObjectMapper mapper; + + private Queue inQueue; + private Queue outQueue; + + private Class inClass; + private Class outClass; + + private FacebookPostActivitySerializer facebookPostActivitySerializer; + + private int count = 0; + + public final static String TERMINATE = new String("TERMINATE"); + + public FacebookTypeConverter(Class inClass, Class outClass) { + this.inClass = inClass; + this.outClass = outClass; + } + + public Queue getProcessorOutputQueue() { + return outQueue; + } + + public void setProcessorInputQueue(Queue inputQueue) { + inQueue = inputQueue; + } + + public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException { + + Object result = null; + + if( outClass.equals( Activity.class )) { + LOGGER.debug("ACTIVITY"); + result = facebookPostActivitySerializer.deserialize(mapper.convertValue(event, Post.class)); + } else if( outClass.equals( Post.class )) { + LOGGER.debug("POST"); + result = mapper.convertValue(event, Post.class); + } else if( outClass.equals( ObjectNode.class )) { + LOGGER.debug("OBJECTNODE"); + result = mapper.convertValue(event, ObjectNode.class); + } + + // no supported conversion were applied + if( result != null ) { + count ++; + return result; + } + + LOGGER.debug("CONVERT FAILED"); + + return null; + + } + + public boolean validate(Object document, Class klass) { + + // TODO + return true; + } + + public boolean isValidJSON(final String json) { + boolean valid = false; + try { + final JsonParser parser = new ObjectMapper().getJsonFactory() + .createJsonParser(json); + while (parser.nextToken() != null) { + } + valid = true; + } catch (JsonParseException jpe) { + LOGGER.warn("validate: {}", jpe); + } catch (IOException ioe) { + LOGGER.warn("validate: {}", ioe); + } + + return valid; + } + + @Override + public List process(StreamsDatum entry) { + + StreamsDatum result = null; + + try { + + Object item = entry.getDocument(); + ObjectNode node; + + LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); + + if( item instanceof String ) { + + // if the target is string, just pass-through + if( String.class.equals(outClass)) { + result = entry; + } + else { + // first check for valid json + node = (ObjectNode)mapper.readTree((String)item); + + // since data is coming from outside provider, we don't know what type the events are + // for now we'll assume post + + Object out = convert(node, Post.class, outClass); + + if( out != null && validate(out, outClass)) + result = new StreamsDatum(out); + } + + } else if( item instanceof ObjectNode || item instanceof Post) { + + // first check for valid json + node = (ObjectNode)mapper.valueToTree(item); + + // since data is coming from outside provider, we don't know what type the events are + // for now we'll assume post + + Object out = convert(node, Post.class, outClass); + + if( out != null && validate(out, outClass)) + result = new StreamsDatum(out); + + } + + } catch (Exception e) { + e.printStackTrace(); + } + + if( result != null ) + return Lists.newArrayList(result); + else + return Lists.newArrayList(); + } + + @Override + public void prepare(Object o) { + mapper = new StreamsJacksonMapper(); + facebookPostActivitySerializer = new FacebookPostActivitySerializer(); + } + + @Override + public void cleanUp() { + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java new file mode 100644 index 0000000..381e6f3 --- /dev/null +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.facebook.provider; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Queues; +import com.google.common.util.concurrent.MoreExecutors; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigRenderOptions; +import facebook4j.*; +import facebook4j.conf.ConfigurationBuilder; +import facebook4j.json.DataObjectFactory; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.facebook.FacebookUserInformationConfiguration; +import org.apache.streams.facebook.FacebookUserstreamConfiguration; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.util.ComponentUtils; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import java.io.IOException; +import java.io.Serializable; +import java.math.BigInteger; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class FacebookFriendFeedProvider implements StreamsProvider, Serializable +{ + + public static final String STREAMS_ID = "FacebookFriendFeedProvider"; + private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendFeedProvider.class); + + private static final ObjectMapper mapper = new StreamsJacksonMapper(); + + private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login"; + private FacebookUserstreamConfiguration configuration; + + private Class klass; + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + + protected volatile Queue providerQueue = new LinkedBlockingQueue(); + + public FacebookUserstreamConfiguration getConfig() { return configuration; } + + public void setConfig(FacebookUserstreamConfiguration config) { this.configuration = config; } + + protected Iterator idsBatches; + + protected ExecutorService executor; + + protected DateTime start; + protected DateTime end; + + protected final AtomicBoolean running = new AtomicBoolean(); + + private DatumStatusCounter countersCurrent = new DatumStatusCounter(); + private DatumStatusCounter countersTotal = new DatumStatusCounter(); + + private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { + return new ThreadPoolExecutor(nThreads, nThreads, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + } + + public FacebookFriendFeedProvider() { + Config config = StreamsConfigurator.config.getConfig("facebook"); + FacebookUserInformationConfiguration configuration; + try { + configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); + } catch (IOException e) { + e.printStackTrace(); + return; + } + } + + public FacebookFriendFeedProvider(FacebookUserstreamConfiguration config) { + this.configuration = config; + } + + public FacebookFriendFeedProvider(Class klass) { + Config config = StreamsConfigurator.config.getConfig("facebook"); + FacebookUserInformationConfiguration configuration; + try { + configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); + } catch (IOException e) { + e.printStackTrace(); + return; + } + this.klass = klass; + } + + public FacebookFriendFeedProvider(FacebookUserstreamConfiguration config, Class klass) { + this.configuration = config; + this.klass = klass; + } + + public Queue getProviderQueue() { + return this.providerQueue; + } + + @Override + public void startStream() { + shutdownAndAwaitTermination(executor); + running.set(true); + } + + public StreamsResultSet readCurrent() { + + StreamsResultSet current; + + synchronized (FacebookUserstreamProvider.class) { + current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue)); + current.setCounter(new DatumStatusCounter()); + current.getCounter().add(countersCurrent); + countersTotal.add(countersCurrent); + countersCurrent = new DatumStatusCounter(); + providerQueue.clear(); + } + + return current; + + } + + public StreamsResultSet readNew(BigInteger sequence) { + LOGGER.debug("{} readNew", STREAMS_ID); + throw new NotImplementedException(); + } + + public StreamsResultSet readRange(DateTime start, DateTime end) { + LOGGER.debug("{} readRange", STREAMS_ID); + this.start = start; + this.end = end; + readCurrent(); + StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); + return result; + } + + @Override + public boolean isRunning() { + return running.get(); + } + + void shutdownAndAwaitTermination(ExecutorService pool) { + pool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + pool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) + System.err.println("Pool did not terminate"); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + + @Override + public void prepare(Object o) { + + executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); + + Preconditions.checkNotNull(providerQueue); + Preconditions.checkNotNull(this.klass); + Preconditions.checkNotNull(configuration.getOauth().getAppId()); + Preconditions.checkNotNull(configuration.getOauth().getAppSecret()); + Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken()); + + Facebook client = getFacebookClient(); + + try { + ResponseList friendResponseList = client.friends().getFriends(); + Paging friendPaging; + do { + + for( Friend friend : friendResponseList ) { + + executor.submit(new FacebookFriendFeedTask(this, friend.getId())); + } + friendPaging = friendResponseList.getPaging(); + friendResponseList = client.fetchNext(friendPaging); + } while( friendPaging != null && + friendResponseList != null ); + } catch (FacebookException e) { + e.printStackTrace(); + } + + } + + protected Facebook getFacebookClient() + { + ConfigurationBuilder cb = new ConfigurationBuilder(); + cb.setDebugEnabled(true) + .setOAuthAppId(configuration.getOauth().getAppId()) + .setOAuthAppSecret(configuration.getOauth().getAppSecret()) + .setOAuthAccessToken(configuration.getOauth().getUserAccessToken()) + .setOAuthPermissions(ALL_PERMISSIONS) + .setJSONStoreEnabled(true) + .setClientVersion("v1.0"); + + FacebookFactory ff = new FacebookFactory(cb.build()); + Facebook facebook = ff.getInstance(); + + return facebook; + } + + @Override + public void cleanUp() { + shutdownAndAwaitTermination(executor); + } + + private class FacebookFriendFeedTask implements Runnable { + + FacebookFriendFeedProvider provider; + Facebook client; + String id; + + public FacebookFriendFeedTask(FacebookFriendFeedProvider provider, String id) { + this.provider = provider; + this.id = id; + } + + @Override + public void run() { + client = provider.getFacebookClient(); + try { + ResponseList postResponseList = client.getFeed(id); + Paging postPaging; + do { + + for (Post item : postResponseList) { + String json = DataObjectFactory.getRawJSON(item); + org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class); + try { + lock.readLock().lock(); + ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue); + countersCurrent.incrementAttempt(); + } finally { + lock.readLock().unlock(); + } + } + postPaging = postResponseList.getPaging(); + postResponseList = client.fetchNext(postPaging); + } while( postPaging != null && + postResponseList != null ); + + } catch (Exception e) { + e.printStackTrace(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java new file mode 100644 index 0000000..2a5ec65 --- /dev/null +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.facebook.provider; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigRenderOptions; +import facebook4j.*; +import facebook4j.conf.ConfigurationBuilder; +import facebook4j.json.DataObjectFactory; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.facebook.FacebookUserInformationConfiguration; +import org.apache.streams.facebook.FacebookUserstreamConfiguration; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.util.ComponentUtils; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import java.io.IOException; +import java.io.Serializable; +import java.math.BigInteger; +import java.util.Iterator; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializable +{ + + public static final String STREAMS_ID = "FacebookFriendPostsProvider"; + private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendUpdatesProvider.class); + + private static final ObjectMapper mapper = new StreamsJacksonMapper(); + + private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login"; + private FacebookUserstreamConfiguration configuration; + + private Class klass; + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + + protected volatile Queue providerQueue = new LinkedBlockingQueue(); + + public FacebookUserstreamConfiguration getConfig() { return configuration; } + + public void setConfig(FacebookUserstreamConfiguration config) { this.configuration = config; } + + protected Iterator idsBatches; + + protected ExecutorService executor; + + protected DateTime start; + protected DateTime end; + + protected final AtomicBoolean running = new AtomicBoolean(); + + private DatumStatusCounter countersCurrent = new DatumStatusCounter(); + private DatumStatusCounter countersTotal = new DatumStatusCounter(); + + private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { + return new ThreadPoolExecutor(nThreads, nThreads, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + } + + public FacebookFriendUpdatesProvider() { + Config config = StreamsConfigurator.config.getConfig("facebook"); + FacebookUserInformationConfiguration configuration; + try { + configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); + } catch (IOException e) { + e.printStackTrace(); + return; + } + } + + public FacebookFriendUpdatesProvider(FacebookUserstreamConfiguration config) { + this.configuration = config; + } + + public FacebookFriendUpdatesProvider(Class klass) { + Config config = StreamsConfigurator.config.getConfig("facebook"); + FacebookUserInformationConfiguration configuration; + try { + configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); + } catch (IOException e) { + e.printStackTrace(); + return; + } + this.klass = klass; + } + + public FacebookFriendUpdatesProvider(FacebookUserstreamConfiguration config, Class klass) { + this.configuration = config; + this.klass = klass; + } + + public Queue getProviderQueue() { + return this.providerQueue; + } + + @Override + public void startStream() { + running.set(true); + } + + public StreamsResultSet readCurrent() { + + Preconditions.checkArgument(idsBatches.hasNext()); + + LOGGER.info("readCurrent"); + + // return stuff + + LOGGER.info("Finished. Cleaning up..."); + + LOGGER.info("Providing {} docs", providerQueue.size()); + + StreamsResultSet result = new StreamsResultSet(providerQueue); + running.set(false); + + LOGGER.info("Exiting"); + + return result; + + } + + public StreamsResultSet readNew(BigInteger sequence) { + LOGGER.debug("{} readNew", STREAMS_ID); + throw new NotImplementedException(); + } + + public StreamsResultSet readRange(DateTime start, DateTime end) { + LOGGER.debug("{} readRange", STREAMS_ID); + this.start = start; + this.end = end; + readCurrent(); + StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); + return result; + } + + @Override + public boolean isRunning() { + return running.get(); + } + + void shutdownAndAwaitTermination(ExecutorService pool) { + pool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + pool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) + System.err.println("Pool did not terminate"); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + + @Override + public void prepare(Object o) { + + executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); + + Preconditions.checkNotNull(providerQueue); + Preconditions.checkNotNull(this.klass); + Preconditions.checkNotNull(configuration.getOauth().getAppId()); + Preconditions.checkNotNull(configuration.getOauth().getAppSecret()); + Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken()); + + Facebook client = getFacebookClient(); + + try { + ResponseList friendResponseList = client.friends().getFriends(); + Paging friendPaging; + do { + + for( Friend friend : friendResponseList ) { + + //client.rawAPI().callPostAPI(); + // add a subscription + } + friendPaging = friendResponseList.getPaging(); + friendResponseList = client.fetchNext(friendPaging); + } while( friendPaging != null && + friendResponseList != null ); + } catch (FacebookException e) { + e.printStackTrace(); + } + + } + + protected Facebook getFacebookClient() + { + ConfigurationBuilder cb = new ConfigurationBuilder(); + cb.setDebugEnabled(true) + .setOAuthAppId(configuration.getOauth().getAppId()) + .setOAuthAppSecret(configuration.getOauth().getAppSecret()) + .setOAuthAccessToken(configuration.getOauth().getUserAccessToken()) + .setOAuthPermissions(ALL_PERMISSIONS) + .setJSONStoreEnabled(true) + .setClientVersion("v1.0"); + + FacebookFactory ff = new FacebookFactory(cb.build()); + Facebook facebook = ff.getInstance(); + + return facebook; + } + + @Override + public void cleanUp() { + shutdownAndAwaitTermination(executor); + } + + private class FacebookFeedPollingTask implements Runnable { + + FacebookUserstreamProvider provider; + Facebook client; + + private Set priorPollResult = Sets.newHashSet(); + + public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) { + provider = facebookUserstreamProvider; + } + + @Override + public void run() { + client = provider.getFacebookClient(); + while (provider.isRunning()) { + try { + ResponseList postResponseList = client.getHome(); + Set update = Sets.newHashSet(postResponseList); + Set repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update)); + Set entrySet = Sets.difference(update, repeats); + for (Post item : entrySet) { + String json = DataObjectFactory.getRawJSON(item); + org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class); + try { + lock.readLock().lock(); + ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue); + countersCurrent.incrementAttempt(); + } finally { + lock.readLock().unlock(); + } + } + priorPollResult = update; + Thread.sleep(configuration.getPollIntervalMillis()); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java new file mode 100644 index 0000000..fce5f22 --- /dev/null +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserInformationProvider.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.facebook.provider; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.MoreExecutors; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigRenderOptions; +import facebook4j.*; +import facebook4j.conf.ConfigurationBuilder; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.facebook.FacebookUserInformationConfiguration; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import java.io.IOException; +import java.io.Serializable; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +public class FacebookUserInformationProvider implements StreamsProvider, Serializable +{ + + public static final String STREAMS_ID = "FacebookUserInformationProvider"; + private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserInformationProvider.class); + + private static final ObjectMapper mapper = new StreamsJacksonMapper(); + + private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login"; + private FacebookUserInformationConfiguration facebookUserInformationConfiguration; + + private Class klass; + protected volatile Queue providerQueue = new LinkedBlockingQueue(); + + public FacebookUserInformationConfiguration getConfig() { return facebookUserInformationConfiguration; } + + public void setConfig(FacebookUserInformationConfiguration config) { this.facebookUserInformationConfiguration = config; } + + protected Iterator idsBatches; + + protected ExecutorService executor; + + protected DateTime start; + protected DateTime end; + + protected final AtomicBoolean running = new AtomicBoolean(); + + private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { + return new ThreadPoolExecutor(nThreads, nThreads, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + } + + public FacebookUserInformationProvider() { + Config config = StreamsConfigurator.config.getConfig("facebook"); + FacebookUserInformationConfiguration facebookUserInformationConfiguration; + try { + facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); + } catch (IOException e) { + e.printStackTrace(); + return; + } + } + + public FacebookUserInformationProvider(FacebookUserInformationConfiguration config) { + this.facebookUserInformationConfiguration = config; + } + + public FacebookUserInformationProvider(Class klass) { + Config config = StreamsConfigurator.config.getConfig("facebook"); + FacebookUserInformationConfiguration facebookUserInformationConfiguration; + try { + facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); + } catch (IOException e) { + e.printStackTrace(); + return; + } + this.klass = klass; + } + + public FacebookUserInformationProvider(FacebookUserInformationConfiguration config, Class klass) { + this.facebookUserInformationConfiguration = config; + this.klass = klass; + } + + public Queue getProviderQueue() { + return this.providerQueue; + } + + @Override + public void startStream() { + running.set(true); + } + + public StreamsResultSet readCurrent() { + + Preconditions.checkArgument(idsBatches.hasNext()); + + LOGGER.info("readCurrent"); + + Facebook client = getFacebookClient(); + + try { + User me = client.users().getMe(); + String json = mapper.writeValueAsString(me); + providerQueue.add( + new StreamsDatum(json, DateTime.now()) + ); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } catch (FacebookException e) { + e.printStackTrace(); + } + + if( idsBatches.hasNext()) { + while (idsBatches.hasNext()) { + try { + List userList = client.users().getUsers(idsBatches.next()); + for (User user : userList) { + + try { + String json = mapper.writeValueAsString(user); + providerQueue.add( + new StreamsDatum(json, DateTime.now()) + ); + } catch (JsonProcessingException e) { + // e.printStackTrace(); + } + } + + } catch (FacebookException e) { + e.printStackTrace(); + } + } + } else { + try { + ResponseList friendResponseList = client.friends().getFriends(); + Paging friendPaging; + do { + + for( Friend friend : friendResponseList ) { + + String json; + try { + json = mapper.writeValueAsString(friend); + providerQueue.add( + new StreamsDatum(json) + ); + } catch (JsonProcessingException e) { +// e.printStackTrace(); + } + } + friendPaging = friendResponseList.getPaging(); + friendResponseList = client.fetchNext(friendPaging); + } while( friendPaging != null && + friendResponseList != null ); + } catch (FacebookException e) { + e.printStackTrace(); + } + + } + + LOGGER.info("Finished. Cleaning up..."); + + LOGGER.info("Providing {} docs", providerQueue.size()); + + StreamsResultSet result = new StreamsResultSet(providerQueue); + running.set(false); + + LOGGER.info("Exiting"); + + return result; + + } + + public StreamsResultSet readNew(BigInteger sequence) { + LOGGER.debug("{} readNew", STREAMS_ID); + throw new NotImplementedException(); + } + + public StreamsResultSet readRange(DateTime start, DateTime end) { + LOGGER.debug("{} readRange", STREAMS_ID); + this.start = start; + this.end = end; + readCurrent(); + StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); + return result; + } + + @Override + public boolean isRunning() { + return running.get(); + } + + void shutdownAndAwaitTermination(ExecutorService pool) { + pool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + pool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) + System.err.println("Pool did not terminate"); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + + @Override + public void prepare(Object o) { + + executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); + + Preconditions.checkNotNull(providerQueue); + Preconditions.checkNotNull(this.klass); + Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppId()); + Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppSecret()); + Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getUserAccessToken()); + Preconditions.checkNotNull(facebookUserInformationConfiguration.getInfo()); + + List ids = new ArrayList(); + List idsBatches = new ArrayList(); + + for(String s : facebookUserInformationConfiguration.getInfo()) { + if(s != null) + { + ids.add(s); + + if(ids.size() >= 100) { + // add the batch + idsBatches.add(ids.toArray(new String[ids.size()])); + // reset the Ids + ids = new ArrayList(); + } + + } + } + + if(ids.size() > 0) + idsBatches.add(ids.toArray(new String[ids.size()])); + + this.idsBatches = idsBatches.iterator(); + } + + protected Facebook getFacebookClient() + { + ConfigurationBuilder cb = new ConfigurationBuilder(); + cb.setDebugEnabled(true) + .setOAuthAppId(facebookUserInformationConfiguration.getOauth().getAppId()) + .setOAuthAppSecret(facebookUserInformationConfiguration.getOauth().getAppSecret()) + .setOAuthAccessToken(facebookUserInformationConfiguration.getOauth().getUserAccessToken()) + .setOAuthPermissions(ALL_PERMISSIONS) + .setJSONStoreEnabled(true) + .setClientVersion("v1.0"); + + FacebookFactory ff = new FacebookFactory(cb.build()); + Facebook facebook = ff.getInstance(); + + return facebook; + } + + @Override + public void cleanUp() { + shutdownAndAwaitTermination(executor); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java new file mode 100644 index 0000000..d4f30e2 --- /dev/null +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.facebook.provider; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Queues; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigRenderOptions; +import facebook4j.*; +import facebook4j.Post; +import facebook4j.conf.ConfigurationBuilder; +import facebook4j.json.DataObjectFactory; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.facebook.FacebookUserInformationConfiguration; +import org.apache.streams.facebook.FacebookUserstreamConfiguration; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.util.ComponentUtils; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import java.io.IOException; +import java.io.Serializable; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class FacebookUserstreamProvider implements StreamsProvider, Serializable { + + public static final String STREAMS_ID = "FacebookUserstreamProvider"; + private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserstreamProvider.class); + + private static final ObjectMapper mapper = new StreamsJacksonMapper(); + + private static final String ALL_PERMISSIONS = "read_stream"; + private FacebookUserstreamConfiguration configuration; + + private Class klass; + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + + protected volatile Queue providerQueue = new LinkedBlockingQueue(); + + public FacebookUserstreamConfiguration getConfig() { + return configuration; + } + + public void setConfig(FacebookUserstreamConfiguration config) { + this.configuration = config; + } + + protected ListeningExecutorService executor; + + protected DateTime start; + protected DateTime end; + + protected final AtomicBoolean running = new AtomicBoolean(); + + private DatumStatusCounter countersCurrent = new DatumStatusCounter(); + private DatumStatusCounter countersTotal = new DatumStatusCounter(); + + protected Facebook client; + + private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { + return new ThreadPoolExecutor(nThreads, nThreads, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + } + + public FacebookUserstreamProvider() { + Config config = StreamsConfigurator.config.getConfig("facebook"); + FacebookUserInformationConfiguration facebookUserInformationConfiguration; + try { + facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); + } catch (IOException e) { + e.printStackTrace(); + return; + } + } + + public FacebookUserstreamProvider(FacebookUserstreamConfiguration config) { + this.configuration = config; + } + + public FacebookUserstreamProvider(Class klass) { + Config config = StreamsConfigurator.config.getConfig("facebook"); + FacebookUserInformationConfiguration facebookUserInformationConfiguration; + try { + facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); + } catch (IOException e) { + e.printStackTrace(); + return; + } + this.klass = klass; + } + + public FacebookUserstreamProvider(FacebookUserstreamConfiguration config, Class klass) { + this.configuration = config; + this.klass = klass; + } + + public Queue getProviderQueue() { + return this.providerQueue; + } + + @Override + public void startStream() { + + client = getFacebookClient(); + + if( configuration.getInfo() != null && + configuration.getInfo().size() > 0 ) { + for( String id : configuration.getInfo()) { + executor.submit(new FacebookFeedPollingTask(this, id)); + } + running.set(true); + } else { + try { + String id = client.getMe().getId(); + executor.submit(new FacebookFeedPollingTask(this, id)); + running.set(true); + } catch (FacebookException e) { + LOGGER.error(e.getMessage()); + running.set(false); + } + } + } + + public StreamsResultSet readCurrent() { + + StreamsResultSet current; + + synchronized (FacebookUserstreamProvider.class) { + current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue)); + current.setCounter(new DatumStatusCounter()); + current.getCounter().add(countersCurrent); + countersTotal.add(countersCurrent); + countersCurrent = new DatumStatusCounter(); + providerQueue.clear(); + } + + return current; + + } + + public StreamsResultSet readNew(BigInteger sequence) { + LOGGER.debug("{} readNew", STREAMS_ID); + throw new NotImplementedException(); + } + + public StreamsResultSet readRange(DateTime start, DateTime end) { + LOGGER.debug("{} readRange", STREAMS_ID); + this.start = start; + this.end = end; + readCurrent(); + StreamsResultSet result = (StreamsResultSet) providerQueue.iterator(); + return result; + } + + @Override + public boolean isRunning() { + return running.get(); + } + + void shutdownAndAwaitTermination(ExecutorService pool) { + pool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + pool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) + System.err.println("Pool did not terminate"); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + + @Override + public void prepare(Object o) { + + executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); + + Preconditions.checkNotNull(providerQueue); + Preconditions.checkNotNull(this.klass); + Preconditions.checkNotNull(configuration.getOauth().getAppId()); + Preconditions.checkNotNull(configuration.getOauth().getAppSecret()); + Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken()); + + client = getFacebookClient(); + + if( configuration.getInfo() != null && + configuration.getInfo().size() > 0 ) { + + List ids = new ArrayList(); + List idsBatches = new ArrayList(); + + for (String s : configuration.getInfo()) { + if (s != null) { + ids.add(s); + + if (ids.size() >= 100) { + // add the batch + idsBatches.add(ids.toArray(new String[ids.size()])); + // reset the Ids + ids = new ArrayList(); + } + + } + } + } + } + + protected Facebook getFacebookClient() { + ConfigurationBuilder cb = new ConfigurationBuilder(); + cb.setDebugEnabled(true) + .setOAuthAppId(configuration.getOauth().getAppId()) + .setOAuthAppSecret(configuration.getOauth().getAppSecret()) + .setOAuthAccessToken(configuration.getOauth().getUserAccessToken()) + .setOAuthPermissions(ALL_PERMISSIONS) + .setJSONStoreEnabled(true); + + FacebookFactory ff = new FacebookFactory(cb.build()); + Facebook facebook = ff.getInstance(); + + return facebook; + } + + @Override + public void cleanUp() { + shutdownAndAwaitTermination(executor); + } + + private class FacebookFeedPollingTask implements Runnable { + + FacebookUserstreamProvider provider; + Facebook client; + String id; + + private Set priorPollResult = Sets.newHashSet(); + + public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) { + this.provider = facebookUserstreamProvider; + } + + public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider, String id) { + this.provider = facebookUserstreamProvider; + this.client = provider.client; + this.id = id; + } + @Override + public void run() { + while (provider.isRunning()) { + ResponseList postResponseList; + try { + postResponseList = client.getFeed(id); + + Set update = Sets.newHashSet(postResponseList); + Set repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update)); + Set entrySet = Sets.difference(update, repeats); + LOGGER.debug(this.id + " response: " + update.size() + " previous: " + repeats.size() + " new: " + entrySet.size()); + for (Post item : entrySet) { + String json = DataObjectFactory.getRawJSON(item); + org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class); + try { + lock.readLock().lock(); + ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue); + countersCurrent.incrementAttempt(); + } finally { + lock.readLock().unlock(); + } + } + priorPollResult = update; + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + Thread.sleep(configuration.getPollIntervalMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookConfiguration.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookConfiguration.json deleted file mode 100644 index b4e5afb..0000000 --- a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookConfiguration.json +++ /dev/null @@ -1,49 +0,0 @@ -{ - "type": "object", - "$schema": "http://json-schema.org/draft-03/schema", - "id": "#", - "javaType" : "org.apache.streams.facebook.FacebookConfiguration", - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "protocol": { - "type": "string", - "description": "The protocol" - }, - "host": { - "type": "string", - "description": "The host" - }, - "port": { - "type": "integer", - "description": "The port" - }, - "version": { - "type": "string", - "description": "The version" - }, - "endpoint": { - "type": "string", - "description": "The endpoint" - }, - "oauth": { - "type": "object", - "dynamic": "true", - "javaType" : "org.apache.streams.facebook.FacebookOAuthConfiguration", - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "appId": { - "type": "string" - }, - "appSecret": { - "type": "string" - }, - "appAccessToken": { - "type": "string" - }, - "userAccessToken": { - "type": "string" - } - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserInformationConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserInformationConfiguration.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserInformationConfiguration.json deleted file mode 100644 index b351be9..0000000 --- a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserInformationConfiguration.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "type": "object", - "$schema": "http://json-schema.org/draft-03/schema", - "id": "#", - "javaType" : "org.apache.streams.facebook.FacebookUserInformationConfiguration", - "javaInterfaces": ["java.io.Serializable"], - "extends": {"$ref":"FacebookConfiguration.json"}, - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "info": { - "type": "array", - "description": "A list of user IDs, indicating users of interest", - "items": { - "type": "string" - } - }, - "pollIntervalMillis": { - "type": "integer", - "default" : "60000", - "description": "Polling interval in ms" - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json deleted file mode 100644 index bcb2258..0000000 --- a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "type": "object", - "$schema": "http://json-schema.org/draft-03/schema", - "id": "#", - "javaType" : "org.apache.streams.facebook.FacebookUserstreamConfiguration", - "javaInterfaces": ["java.io.Serializable"], - "extends": {"$ref":"FacebookConfiguration.json"}, - "properties": { - "info": { - "type": "array", - "description": "A list of user IDs, indicating users of interest", - "items": { - "type": "string" - } - }, - "pollIntervalMillis": { - "type": "integer", - "default" : "60000", - "description": "Polling interval in ms" - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/graph/Post.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/graph/Post.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/graph/Post.json deleted file mode 100644 index 23bcb08..0000000 --- a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/graph/Post.json +++ /dev/null @@ -1,203 +0,0 @@ -{ - "type": "object", - "$schema": "http://json-schema.org/draft-03/schema", - "id": "#", - "javaType": "org.apache.streams.facebook.Post", - "properties": { - "id": { - "type": "string" - }, - "from": { - "type": "object", - "properties": { - "id": { - "type": "string" - }, - "name": { - "type": "string" - } - } - }, - "to": { - "type": "object", - "properties": { - "data": { - "type": "array", - "items": { - "type": "object", - "properties": { - "id": { - "type": "string" - }, - "name": { - "type": "string" - } - } - } - } - } - }, - "message": { - "type": "string" - }, - "message_tags": { - "type": "object", - "properties": { - "data": { - "type": "array", - "items": { - "type": "object", - "properties": { - "id": { - "type": "string" - }, - "name": { - "type": "string" - } - } - } - } - } - }, - "picture": { - "type": "string" - }, - "link": { - "type": "string" - }, - "name": { - "type": "string" - }, - "caption": { - "type": "string" - }, - "description": { - "type": "string" - }, - "source": { - "type": "string" - }, - "icon": { - "type": "string" - }, - "actions": { - "type": "array", - "items": { - "type": "object", - "properties": { - "name": { - "type": "string" - }, - "link": { - "type": "string" - } - } - } - }, - "comments": { - "type": "array", - "items": { - "type": "object", - "properties": { - "id": { - "type": "string" - }, - "from": { - "type": "string" - }, - "message": { - "type": "string" - }, - "created_time": { - "type": "string", - "format" : "date-time" - } - } - } - }, - "likes": { - "type": "array", - "items": { - "type": "object", - "properties": { - "name": { - "type": "string" - }, - "link": { - "type": "string" - } - } - } - }, - "type": { - "type": "string" - }, - "place": { - "type": "object", - "properties": { - "name": { - "type": "string" - }, - "id": { - "type": "string" - } - } - }, - "story": { - "type": "string" - }, - "shares": { - "type": "int" - }, - "object_id": { - "type": "int" - }, - "application": { - "type": "object", - "properties": { - "name": { - "type": "string" - }, - "id": { - "type": "string" - } - } - }, - "created_time": { - "type": "string", - "format" : "date-time" - }, - "updated_time": { - "type": "string", - "format" : "date-time" - }, - "include_hidden": { - "type": "boolean" - }, - "status_type": { - "type": "string" - }, - "properties": { - "type": "array", - "items": { - "type": "object", - "properties": { - "name": { - "type": "string" - }, - "text": { - "type": "string" - } - } - } - }, - "privacy": { - "type": "object", - "properties": { - "value": { - "type": "string" - } - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookConfiguration.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookConfiguration.json new file mode 100644 index 0000000..b4e5afb --- /dev/null +++ b/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookConfiguration.json @@ -0,0 +1,49 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.facebook.FacebookConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "protocol": { + "type": "string", + "description": "The protocol" + }, + "host": { + "type": "string", + "description": "The host" + }, + "port": { + "type": "integer", + "description": "The port" + }, + "version": { + "type": "string", + "description": "The version" + }, + "endpoint": { + "type": "string", + "description": "The endpoint" + }, + "oauth": { + "type": "object", + "dynamic": "true", + "javaType" : "org.apache.streams.facebook.FacebookOAuthConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "appId": { + "type": "string" + }, + "appSecret": { + "type": "string" + }, + "appAccessToken": { + "type": "string" + }, + "userAccessToken": { + "type": "string" + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookUserInformationConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookUserInformationConfiguration.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookUserInformationConfiguration.json new file mode 100644 index 0000000..b351be9 --- /dev/null +++ b/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookUserInformationConfiguration.json @@ -0,0 +1,23 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.facebook.FacebookUserInformationConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "extends": {"$ref":"FacebookConfiguration.json"}, + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "info": { + "type": "array", + "description": "A list of user IDs, indicating users of interest", + "items": { + "type": "string" + } + }, + "pollIntervalMillis": { + "type": "integer", + "default" : "60000", + "description": "Polling interval in ms" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookUserstreamConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookUserstreamConfiguration.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookUserstreamConfiguration.json new file mode 100644 index 0000000..bcb2258 --- /dev/null +++ b/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookUserstreamConfiguration.json @@ -0,0 +1,22 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.facebook.FacebookUserstreamConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "extends": {"$ref":"FacebookConfiguration.json"}, + "properties": { + "info": { + "type": "array", + "description": "A list of user IDs, indicating users of interest", + "items": { + "type": "string" + } + }, + "pollIntervalMillis": { + "type": "integer", + "default" : "60000", + "description": "Polling interval in ms" + } + } +} \ No newline at end of file