pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] srkukarni closed pull request #2317: Make twitter source to emit records with schema info
Date Thu, 23 Aug 2018 21:31:00 GMT
srkukarni closed pull request #2317: Make twitter source to emit records with schema info
URL: https://github.com/apache/incubator-pulsar/pull/2317
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
new file mode 100644
index 0000000000..fab936e3a2
--- /dev/null
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.twitter;
+
+import java.util.List;
+
+import lombok.Data;
+
+@Data
+public class TweetData {
+    private String createdAt;
+    private Long id;
+    private String idStr;
+    private String text;
+    private String source;
+    private Boolean truncated;
+    private User user;
+    private RetweetedStatus retweetedStatus;
+    private Boolean isQuoteStatus;
+    private Long quoteCount;
+    private Long replyCount;
+    private Long retweetCount;
+    private Long favoriteCount;
+    private Boolean favorited;
+    private Boolean retweeted;
+    private String filterLevel;
+    private String lang;
+    private String timestampMs;
+
+    private Delete delete;
+
+    @Data
+    public static class User {
+        private Long id;
+        private String idStr;
+        private String name;
+        private String screenName;
+        private String location;
+        private String description;
+        private String translatorType;
+        private Boolean _protected;
+        private Boolean verified;
+        private Long followersCount;
+        private Long friendsCount;
+        private Long listedCount;
+        private Long favouritesCount;
+        private Long statusesCount;
+        private String createdAt;
+        private Boolean geoEnabled;
+        private String lang;
+        private Boolean contributorsEnabled;
+        private Boolean isTranslator;
+        private String profileBackgroundColor;
+        private String profileBackgroundImageUrl;
+        private String profileBackgroundImageUrlHttps;
+        private Boolean profileBackgroundTile;
+        private String profileLinkColor;
+        private String profileSidebarBorderColor;
+        private String profileSidebarFillColor;
+        private String profileTextColor;
+        private Boolean profileUseBackgroundImage;
+        private String profileImageUrl;
+        private String profileImageUrlHttps;
+        private String profileBannerUrl;
+        private Boolean defaultProfile;
+        private Boolean defaultProfileImage;
+    }
+
+    @Data
+    public static class Url {
+        private String url;
+        private String expandedUrl;
+        private String displayUrl;
+        private List<Long> indices = null;
+    }
+
+    @Data
+    public static class RetweetedStatus {
+        private String createdAt;
+        private Long id;
+        private String idStr;
+        private String text;
+        private String source;
+        private Boolean truncated;
+        private User user;
+        private Boolean isQuoteStatus;
+        private Long quoteCount;
+        private Long replyCount;
+        private Long retweetCount;
+        private Long favoriteCount;
+        private Boolean favorited;
+        private Boolean retweeted;
+        private String filterLevel;
+        private String lang;
+    }
+
+    @Data
+    public static class Status {
+        private Long id;
+        private String idStr;
+        private Long userId;
+        private String userIdStr;
+    }
+
+    @Data
+    public static class Delete {
+        private Status status;
+        private String timestampMs;
+    }
+}
diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
index 76ea4ac250..eef9077427 100644
--- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
@@ -19,6 +19,8 @@
 
 package org.apache.pulsar.io.twitter;
 
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.twitter.hbc.ClientBuilder;
 import com.twitter.hbc.common.DelimitedStreamReader;
 import com.twitter.hbc.core.Constants;
@@ -44,7 +46,7 @@
 /**
  * Simple Push based Twitter FireHose Source
  */
-public class TwitterFireHose extends PushSource<String> {
+public class TwitterFireHose extends PushSource<TweetData> {
 
     private static final Logger LOG = LoggerFactory.getLogger(TwitterFireHose.class);
 
@@ -53,6 +55,8 @@
     // ----- Runtime fields
     private Object waitObject;
 
+    private final ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
+
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) throws
IOException {
         TwitterFireHoseConfig hoseConfig = TwitterFireHoseConfig.load(config);
@@ -116,14 +120,17 @@ public void setup(InputStream input) {
 
                     @Override
                     public boolean process() throws IOException, InterruptedException {
-                        String line = reader.readLine();
+                        String tweetStr = reader.readLine();
+
                         try {
+                            TweetData tweet = mapper.readValue(tweetStr, TweetData.class);
+
                             // We don't really care if the record succeeds or not.
                             // However might be in the future to count failures
                             // TODO:- Figure out the metrics story for connectors
-                            consume(new TwitterRecord(line));
+                            consume(new TwitterRecord(tweet));
                         } catch (Exception e) {
-                            LOG.error("Exception thrown");
+                            LOG.error("Exception thrown: {}", e);
                         }
                         return true;
                     }
@@ -159,10 +166,10 @@ private void stopThread() {
         }
     }
 
-    static private class TwitterRecord implements Record<String> {
-        private String tweet;
+    static private class TwitterRecord implements Record<TweetData> {
+        private final TweetData tweet;
 
-        public TwitterRecord(String tweet) {
+        public TwitterRecord(TweetData tweet) {
             this.tweet = tweet;
         }
 
@@ -173,7 +180,7 @@ public TwitterRecord(String tweet) {
         }
 
         @Override
-        public String getValue() {
+        public TweetData getValue() {
             return tweet;
         }
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message