pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] srkukarni closed pull request #2446: Add EventTime interface to Record
Date Sun, 26 Aug 2018 05:33:10 GMT
srkukarni closed pull request #2446: Add EventTime interface to Record
URL: https://github.com/apache/incubator-pulsar/pull/2446
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
index 2704909043..59cc104b05 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
@@ -46,6 +46,15 @@
      */
     T getValue();
 
+    /**
+     * Retrieves the event time of the record from the source.
+     *
+     * @return millis since epoch
+     */
+    default Optional<Long> getEventTime() {
+        return Optional.empty();
+    }
+
     /**
      * Retrieves the partition information if any of the record.
      *
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index c1da6862a7..c3df393592 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -22,6 +22,7 @@
 
 import java.util.Base64;
 import java.util.Map;
+import java.util.Optional;
 
 import lombok.AccessLevel;
 import lombok.Getter;
@@ -239,6 +240,12 @@ public void write(Record<T> record) throws Exception {
             msg.property("__pfn_input_topic__", pulsarRecord.getTopicName().get())
                .property("__pfn_input_msg_id__",
                          new String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
+        } else {
+            // It is coming from some source
+            Optional<Long> eventTime = sinkRecord.getSourceRecord().getEventTime();
+            if (eventTime.isPresent()) {
+                msg.eventTime(eventTime.get());
+            }
         }
 
         pulsarSinkProcessor.sendOutputMessage(msg, record);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 46c9213b83..359f48e04e 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -74,6 +74,15 @@ public T getValue() {
         return message.getValue();
     }
 
+    @Override
+    public Optional<Long> getEventTime() {
+        if (message.getEventTime() != 0) {
+            return Optional.of(message.getEventTime());
+        } else {
+            return Optional.empty();
+        }
+    }
+
     @Override
     public Optional<EncryptionContext> getEncryptionCtx() {
         return message.getEncryptionCtx();
diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
index 3e5503d8cc..36d4dc8328 100644
--- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
@@ -44,6 +44,7 @@
     private String timestampMs;
     private Delete delete;
 
+
     @Data
     public static class User {
         private Long id;
diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
index 2631db1796..fc61945449 100644
--- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
@@ -34,9 +34,12 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.Map;
 import java.util.Optional;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.SourceContext;
@@ -46,6 +49,7 @@
 /**
  * Simple Push based Twitter FireHose Source
  */
+@Slf4j
 public class TwitterFireHose extends PushSource<TweetData> {
 
     private static final Logger LOG = LoggerFactory.getLogger(TwitterFireHose.class);
@@ -126,7 +130,7 @@ public boolean process() throws IOException, InterruptedException {
                             // We don't really care if the record succeeds or not.
                             // However might be in the future to count failures
                             // TODO:- Figure out the metrics story for connectors
-                            consume(new TwitterRecord(tweet));
+                            consume(new TwitterRecord(tweet, config.getGuestimateTweetTime()));
                         } catch (Exception e) {
                             LOG.error("Exception thrown: {}", e);
                         }
@@ -166,9 +170,12 @@ private void stopThread() {
 
     static private class TwitterRecord implements Record<TweetData> {
         private final TweetData tweet;
+        private static SimpleDateFormat dateFormat = new SimpleDateFormat("EEE MMM d HH:mm:ss
Z yyyy");
+        private final boolean guestimateTweetTime;
 
-        public TwitterRecord(TweetData tweet) {
+        public TwitterRecord(TweetData tweet, boolean guestimateTweetTime) {
             this.tweet = tweet;
+            this.guestimateTweetTime = guestimateTweetTime;
         }
 
         @Override
@@ -177,6 +184,22 @@ public TwitterRecord(TweetData tweet) {
             return Optional.empty();
         }
 
+        @Override
+        public Optional<Long> getEventTime() {
+            try {
+                if (tweet.getCreatedAt() != null) {
+                    Date d = dateFormat.parse(tweet.getCreatedAt());
+                    return Optional.of(d.toInstant().toEpochMilli());
+                } else if (guestimateTweetTime) {
+                    return Optional.of(System.currentTimeMillis());
+                } else {
+                    return Optional.empty();
+                }
+            } catch (Exception e) {
+                return Optional.empty();
+            }
+        }
+
         @Override
         public TweetData getValue() {
             return tweet;
diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
index 83f1bafdf4..88acb3300d 100644
--- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
@@ -44,6 +44,9 @@
     private String consumerSecret;
     private String token;
     private String tokenSecret;
+    // Most firehose events have null createdAt time. If this parameter is set to true
+    // then we estimate the createdTime of each firehose event to be current time.
+    private Boolean guestimateTweetTime = false;
 
     // ------ Optional property keys
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message