Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4C7F4200C23 for ; Wed, 8 Feb 2017 05:33:00 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 4AE0C160B3E; Wed, 8 Feb 2017 04:33:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CCD22160B68 for ; Wed, 8 Feb 2017 05:32:57 +0100 (CET) Received: (qmail 31311 invoked by uid 500); 8 Feb 2017 04:32:57 -0000 Mailing-List: contact commits-help@streams.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@streams.incubator.apache.org Delivered-To: mailing list commits@streams.incubator.apache.org Received: (qmail 31302 invoked by uid 99); 8 Feb 2017 04:32:57 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Feb 2017 04:32:57 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 71A90C0A0D for ; Wed, 8 Feb 2017 04:32:56 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id Fsk1p8p8waWx for ; Wed, 8 Feb 2017 04:32:50 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 369925F254 for ; Wed, 8 Feb 2017 04:32:48 +0000 (UTC) Received: (qmail 31226 invoked by uid 99); 8 Feb 2017 04:32:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Feb 2017 04:32:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2D749DFADC; Wed, 8 Feb 2017 04:32:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: smarthi@apache.org To: commits@streams.incubator.apache.org Date: Wed, 08 Feb 2017 04:32:48 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/9] incubator-streams git commit: STREAMS-463: Move every class in all repos underneath org.apache.streams, this closes apache/incubator-streams#356 archived-at: Wed, 08 Feb 2017 04:33:00 -0000 http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeUserActivityProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeUserActivityProvider.java b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeUserActivityProvider.java new file mode 100644 index 0000000..d4719ce --- /dev/null +++ b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeUserActivityProvider.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.youtube.provider; + +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.util.api.requests.backoff.BackOffStrategy; +import org.apache.streams.youtube.YoutubeConfiguration; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.services.youtube.YouTube; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Retrieve recent activity from a list of user ids or names. + */ +public class YoutubeUserActivityProvider extends YoutubeProvider { + + public YoutubeUserActivityProvider() { + super(); + } + + public YoutubeUserActivityProvider(YoutubeConfiguration config) { + super(config); + } + + /** + * To use from command line: + *

+ *

+ * Supply (at least) the following required configuration in application.conf: + *

+ *

+ * youtube.oauth.pathToP12KeyFile + * youtube.oauth.serviceAccountEmailAddress + * youtube.apiKey + * youtube.youtubeUsers + *

+ *

+ * Launch using: + *

+ *

+ * mvn exec:java -Dexec.mainClass=org.apache.streams.youtube.provider.YoutubeUserActivityProvider -Dexec.args="application.conf tweets.json" + * + * @param args args + * @throws Exception Exception + */ + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File file = new File(configfile); + assert (file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + YoutubeConfiguration config = new ComponentConfigurator<>(YoutubeConfiguration.class).detectConfiguration(typesafe, "youtube"); + YoutubeUserActivityProvider provider = new YoutubeUserActivityProvider(config); + + ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator iterator = provider.readCurrent().iterator(); + while (iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; + try { + if (datum.getDocument() instanceof String) { + json = (String) datum.getDocument(); + } else { + json = mapper.writeValueAsString(datum.getDocument()); + } + outStream.println(json); + } catch (JsonProcessingException ex) { + System.err.println(ex.getMessage()); + } + } + } + while (provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } + + @Override + protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue queue, YouTube youtube, UserInfo userInfo) { + return new YoutubeUserActivityCollector(youtube, queue, strategy, userInfo, config); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeActivityUtil.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeActivityUtil.java b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeActivityUtil.java new file mode 100644 index 0000000..4b985f6 --- /dev/null +++ b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeActivityUtil.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.youtube.serializer; + +import org.apache.streams.exceptions.ActivitySerializerException; +import org.apache.streams.pojo.extensions.ExtensionUtil; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.ActivityObject; +import org.apache.streams.pojo.json.Image; +import org.apache.streams.pojo.json.Provider; + +import com.google.api.services.youtube.YouTube; +import com.google.api.services.youtube.model.Channel; +import com.google.api.services.youtube.model.Thumbnail; +import com.google.api.services.youtube.model.ThumbnailDetails; +import com.google.api.services.youtube.model.Video; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class YoutubeActivityUtil { + + private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeActivityUtil.class); + + /** + * Given a {@link YouTube.Videos} object and an + * {@link Activity} object, fill out the appropriate details + * + * @param video Video + * @param activity Activity + * @throws ActivitySerializerException ActivitySerializerException + */ + public static void updateActivity(Video video, Activity activity, String channelId) throws ActivitySerializerException { + activity.setActor(buildActor(video, video.getSnippet().getChannelId())); + activity.setVerb("post"); + + activity.setId(formatId(activity.getVerb(), Optional.ofNullable(video.getId()).orElse(null))); + + activity.setPublished(new DateTime(video.getSnippet().getPublishedAt().getValue())); + activity.setTitle(video.getSnippet().getTitle()); + activity.setContent(video.getSnippet().getDescription()); + activity.setUrl("https://www.youtube.com/watch?v=" + video.getId()); + + activity.setProvider(getProvider()); + + activity.setObject(buildActivityObject(video)); + + addYoutubeExtensions(activity, video); + } + + + /** + * Given a {@link Channel} object and an + * {@link Activity} object, fill out the appropriate details + * + * @param channel Channel + * @param activity Activity + * @throws ActivitySerializerException ActivitySerializerException + */ + public static void updateActivity(Channel channel, Activity activity, String channelId) throws ActivitySerializerException { + try { + activity.setProvider(getProvider()); + activity.setVerb("post"); + activity.setActor(createActorForChannel(channel)); + Map extensions = new HashMap<>(); + extensions.put("youtube", channel); + activity.setAdditionalProperty("extensions", extensions); + } catch (Throwable throwable) { + throw new ActivitySerializerException(throwable); + } + } + + /** + * createActorForChannel. + * @param channel Channel + * @return $.actor + */ + public static ActivityObject createActorForChannel(Channel channel) { + ActivityObject actor = new ActivityObject(); + // TODO: use generic provider id concatenator + actor.setId("id:youtube:" + channel.getId()); + actor.setSummary(channel.getSnippet().getDescription()); + actor.setDisplayName(channel.getSnippet().getTitle()); + Image image = new Image(); + image.setUrl(channel.getSnippet().getThumbnails().getHigh().getUrl()); + actor.setImage(image); + actor.setUrl("https://youtube.com/user/" + channel.getId()); + Map actorExtensions = new HashMap<>(); + actorExtensions.put("followers", channel.getStatistics().getSubscriberCount()); + actorExtensions.put("posts", channel.getStatistics().getVideoCount()); + actor.setAdditionalProperty("extensions", actorExtensions); + return actor; + } + + /** + * Given a video object, create the appropriate activity object with a valid image + * (thumbnail) and video URL. + * @param video Video + * @return Activity Object with Video URL and a thumbnail image + */ + private static ActivityObject buildActivityObject(Video video) { + ActivityObject activityObject = new ActivityObject(); + + ThumbnailDetails thumbnailDetails = video.getSnippet().getThumbnails(); + Thumbnail thumbnail = thumbnailDetails.getDefault(); + + if (thumbnail != null) { + Image image = new Image(); + image.setUrl(thumbnail.getUrl()); + image.setHeight(thumbnail.getHeight()); + image.setWidth(thumbnail.getWidth()); + + activityObject.setImage(image); + } + + activityObject.setUrl("https://www.youtube.com/watch?v=" + video.getId()); + activityObject.setObjectType("video"); + + return activityObject; + } + + /** + * Add the Youtube extensions to the Activity object that we're building. + * @param activity Activity + * @param video Video + */ + private static void addYoutubeExtensions(Activity activity, Video video) { + Map extensions = ExtensionUtil.getInstance().ensureExtensions(activity); + + extensions.put("youtube", video); + + if (video.getStatistics() != null) { + Map likes = new HashMap<>(); + likes.put("count", video.getStatistics().getCommentCount()); + extensions.put("likes", likes); + } + } + + /** + * Build an {@link ActivityObject} actor given the video object + * @param video Video + * @param id id + * @return Actor object + */ + private static ActivityObject buildActor(Video video, String id) { + ActivityObject actor = new ActivityObject(); + + actor.setId("id:youtube:" + id); + actor.setDisplayName(video.getSnippet().getChannelTitle()); + actor.setSummary(video.getSnippet().getDescription()); + actor.setAdditionalProperty("handle", video.getSnippet().getChannelTitle()); + + return actor; + } + + /** + * Gets the common youtube {@link Provider} object + * @return a provider object representing YouTube + */ + public static Provider getProvider() { + Provider provider = new Provider(); + provider.setId("id:providers:youtube"); + provider.setDisplayName("YouTube"); + return provider; + } + + /** + * Formats the ID to conform with the Apache Streams activity ID convention + * @param idparts the parts of the ID to join + * @return a valid Activity ID in format "id:youtube:part1:part2:...partN" + */ + public static String formatId(String... idparts) { + return String.join(":", + Stream.concat(Arrays.stream(new String[] {"id:youtube"}), Arrays.stream(idparts)).collect(Collectors.toList())); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeChannelDeserializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeChannelDeserializer.java b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeChannelDeserializer.java new file mode 100644 index 0000000..e5d0ed0 --- /dev/null +++ b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeChannelDeserializer.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.youtube.serializer; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.api.client.util.DateTime; +import com.google.api.services.youtube.model.Channel; +import com.google.api.services.youtube.model.ChannelContentDetails; +import com.google.api.services.youtube.model.ChannelLocalization; +import com.google.api.services.youtube.model.ChannelSnippet; +import com.google.api.services.youtube.model.ChannelStatistics; +import com.google.api.services.youtube.model.ChannelTopicDetails; +import com.google.api.services.youtube.model.Thumbnail; +import com.google.api.services.youtube.model.ThumbnailDetails; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +/** + * YoutubeChannelDeserializer is a JsonDeserializer for Channel. + */ +public class YoutubeChannelDeserializer extends JsonDeserializer { + + @Override + public Channel deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException { + JsonNode node = jp.getCodec().readTree(jp); + try { + Channel channel = new Channel(); + if (node.findPath("etag") != null) { + channel.setEtag(node.get("etag").asText()); + } + if (node.findPath("kind") != null) { + channel.setKind(node.get("kind").asText()); + } + channel.setId(node.get("id").asText()); + channel.setTopicDetails(setTopicDetails(node.findValue("topicDetails"))); + channel.setStatistics(setChannelStatistics(node.findValue("statistics"))); + channel.setContentDetails(setContentDetails(node.findValue("contentDetails"))); + channel.setSnippet(setChannelSnippet(node.findValue("snippet"))); + return channel; + } catch (Throwable throwable) { + throw new IOException(throwable); + } + } + + protected ChannelSnippet setChannelSnippet(JsonNode node) { + ChannelSnippet snippet = new ChannelSnippet(); + snippet.setTitle(node.get("title").asText()); + snippet.setDescription(node.get("description").asText()); + snippet.setPublishedAt(new DateTime(node.get("publishedAt").get("value").longValue())); + snippet.setLocalized(setLocalized(node.findValue("localized"))); + snippet.setThumbnails(setThumbnails(node.findValue("thumbnails"))); + return snippet; + } + + protected ThumbnailDetails setThumbnails(JsonNode node) { + ThumbnailDetails details = new ThumbnailDetails(); + if (node == null) { + return details; + } + details.setDefault(new Thumbnail().setUrl(node.get("default").get("url").asText())); + details.setHigh(new Thumbnail().setUrl(node.get("high").get("url").asText())); + details.setMedium(new Thumbnail().setUrl(node.get("medium").get("url").asText())); + return details; + } + + protected ChannelLocalization setLocalized(JsonNode node) { + if (node == null) { + return new ChannelLocalization(); + } + ChannelLocalization localization = new ChannelLocalization(); + localization.setDescription(node.get("description").asText()); + localization.setTitle(node.get("title").asText()); + return localization; + } + + protected ChannelContentDetails setContentDetails(JsonNode node) { + ChannelContentDetails contentDetails = new ChannelContentDetails(); + if (node == null) { + return contentDetails; + } + if (node.findValue("googlePlusUserId") != null) { + contentDetails.setGooglePlusUserId(node.get("googlePlusUserId").asText()); + } + contentDetails.setRelatedPlaylists(setRelatedPlaylists(node.findValue("relatedPlaylists"))); + return contentDetails; + } + + protected ChannelContentDetails.RelatedPlaylists setRelatedPlaylists(JsonNode node) { + ChannelContentDetails.RelatedPlaylists playlists = new ChannelContentDetails.RelatedPlaylists(); + if (node == null) { + return playlists; + } + if (node.findValue("favorites") != null) { + playlists.setFavorites(node.get("favorites").asText()); + } + if (node.findValue("likes") != null) { + playlists.setLikes(node.get("likes").asText()); + } + if (node.findValue("uploads") != null) { + playlists.setUploads(node.get("uploads").asText()); + } + return playlists; + } + + protected ChannelStatistics setChannelStatistics(JsonNode node) { + ChannelStatistics stats = new ChannelStatistics(); + if (node == null) { + return stats; + } + stats.setCommentCount(node.get("commentCount").bigIntegerValue()); + stats.setHiddenSubscriberCount(node.get("hiddenSubscriberCount").asBoolean()); + stats.setSubscriberCount(node.get("subscriberCount").bigIntegerValue()); + stats.setVideoCount(node.get("videoCount").bigIntegerValue()); + stats.setViewCount(node.get("viewCount").bigIntegerValue()); + return stats; + } + + protected ChannelTopicDetails setTopicDetails(JsonNode node) { + ChannelTopicDetails details = new ChannelTopicDetails(); + if (node == null) { + return details; + } + List topicIds = new LinkedList<>(); + for (JsonNode jsonNode : node.get("topicIds")) { + topicIds.add(jsonNode.asText()); + } + details.setTopicIds(topicIds); + return details; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeEventClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeEventClassifier.java b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeEventClassifier.java new file mode 100644 index 0000000..964d6ca --- /dev/null +++ b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeEventClassifier.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.youtube.serializer; + +import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.api.services.youtube.model.Video; +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.util.Objects; + +public class YoutubeEventClassifier { + private static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private static final String VIDEO_IDENTIFIER = "\"youtube#video\""; + private static final String CHANNEL_IDENTIFIER = "youtube#channel"; + + /** + * detect probable Class of a json String from YouTube. + * @param json json + * @return Class + */ + public static Class detectClass(String json) { + Objects.requireNonNull(json); + Preconditions.checkArgument(StringUtils.isNotEmpty(json)); + + ObjectNode objectNode; + try { + objectNode = (ObjectNode) mapper.readTree(json); + } catch (IOException ex) { + ex.printStackTrace(); + return null; + } + + if (objectNode.findValue("kind") != null && objectNode.get("kind").toString().equals(VIDEO_IDENTIFIER)) { + return Video.class; + } else if (objectNode.findValue("kind") != null && objectNode.get("kind").toString().contains(CHANNEL_IDENTIFIER)) { + return com.google.api.services.youtube.model.Channel.class; + } else { + return ObjectNode.class; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeVideoDeserializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeVideoDeserializer.java b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeVideoDeserializer.java new file mode 100644 index 0000000..24f8e51 --- /dev/null +++ b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/serializer/YoutubeVideoDeserializer.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.youtube.serializer; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.api.client.util.DateTime; +import com.google.api.services.youtube.model.Thumbnail; +import com.google.api.services.youtube.model.ThumbnailDetails; +import com.google.api.services.youtube.model.Video; +import com.google.api.services.youtube.model.VideoSnippet; +import com.google.api.services.youtube.model.VideoStatistics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class YoutubeVideoDeserializer extends JsonDeserializer