beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [1/2] beam git commit: append to #2135, add 1). fix issue of NO_TIMESTAMP type in 10; 2). rename field to 'timestamp';
Date Fri, 24 Mar 2017 17:15:18 GMT
Repository: beam
Updated Branches:
  refs/heads/master 741242732 -> 5c2da7dc2


append to #2135, add
1). fix issue of NO_TIMESTAMP type in 10;
2). rename field to 'timestamp';


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f10509e7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f10509e7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f10509e7

Branch: refs/heads/master
Commit: f10509e745ff234110bc50d16aba1cb6813036b6
Parents: 7412427
Author: mingmxu <mingmxu@ebay.com>
Authored: Fri Mar 17 13:18:17 2017 -0700
Committer: Davor Bonaci <davor@google.com>
Committed: Fri Mar 24 10:14:33 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/kafka/ConsumerSpEL.java  | 43 +++++++++++++++++---
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 21 +++++++++-
 .../apache/beam/sdk/io/kafka/KafkaRecord.java   | 15 +++++--
 .../beam/sdk/io/kafka/KafkaRecordCoder.java     |  5 +++
 4 files changed, 73 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f10509e7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
index b92b6fa..8fe17c1 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
@@ -17,10 +17,14 @@
  */
 package org.apache.beam.sdk.io.kafka;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.Collection;
-
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.expression.Expression;
 import org.springframework.expression.ExpressionParser;
 import org.springframework.expression.spel.SpelParserConfiguration;
@@ -33,16 +37,28 @@ import org.springframework.expression.spel.support.StandardEvaluationContext;
  * to eliminate the method definition differences.
  */
 class ConsumerSpEL {
-  SpelParserConfiguration config = new SpelParserConfiguration(true, true);
-  ExpressionParser parser = new SpelExpressionParser(config);
+  private static final Logger LOG = LoggerFactory.getLogger(ConsumerSpEL.class);
+
+  private SpelParserConfiguration config = new SpelParserConfiguration(true, true);
+  private ExpressionParser parser = new SpelExpressionParser(config);
 
-  Expression seek2endExpression =
+  private Expression seek2endExpression =
       parser.parseExpression("#consumer.seekToEnd(#tp)");
 
-  Expression assignExpression =
+  private Expression assignExpression =
       parser.parseExpression("#consumer.assign(#tp)");
 
-  public ConsumerSpEL() {}
+  private Method timestampMethod;
+  private boolean hasRecordTimestamp = false;
+
+  public ConsumerSpEL() {
+    try {
+      timestampMethod = ConsumerRecord.class.getMethod("timestamp", (Class<?>[]) null);
+      hasRecordTimestamp = timestampMethod.getReturnType().equals(Long.TYPE);
+    } catch (NoSuchMethodException | SecurityException e) {
+      LOG.debug("Timestamp for Kafka message is not available.");
+    }
+  }
 
   public void evaluateSeek2End(Consumer consumer, TopicPartition topicPartitions) {
     StandardEvaluationContext mapContext = new StandardEvaluationContext();
@@ -57,4 +73,19 @@ class ConsumerSpEL {
     mapContext.setVariable("tp", topicPartitions);
     assignExpression.getValue(mapContext);
   }
+
+  public long getRecordTimestamp(ConsumerRecord<byte[], byte[]> rawRecord) {
+    long timestamp;
+    try {
+      //for Kafka 0.9, set to System.currentTimeMillis();
+      //for kafka 0.10, when NO_TIMESTAMP also set to System.currentTimeMillis();
+      if (!hasRecordTimestamp || (timestamp = (long) timestampMethod.invoke(rawRecord)) <=
0L) {
+        timestamp = System.currentTimeMillis();
+      }
+    } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
e) {
+      // Not expected. Method timestamp() is already checked.
+      throw new RuntimeException(e);
+    }
+    return timestamp;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f10509e7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 890fb2b..310392c 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -203,6 +203,14 @@ import org.slf4j.LoggerFactory;
  * {@link ProducerConfig} for sink. E.g. if you would like to enable offset
  * <em>auto commit</em> (for external monitoring or other purposes), you can
set
  * <tt>"group.id"</tt>, <tt>"enable.auto.commit"</tt>, etc.
+ *
+ * <h3>Event Timestamp and Watermark</h3>
+ * By default record timestamp and watermark are based on processing time in KafkaIO reader.
+ * This can be overridden by providing {@code WatermarkFn} with
+ * {@link Read#withWatermarkFn(SerializableFunction)}, and {@code TimestampFn} with
+ * {@link Read#withTimestampFn(SerializableFunction)}.<br>
+ * Note that {@link KafkaRecord#getTimestamp()} reflects timestamp provided by Kafka if any,
+ * otherwise it is set to processing time.
  */
 public class KafkaIO {
   /**
@@ -428,6 +436,7 @@ public class KafkaIO {
       checkNotNull(getValueCoder(), "Value coder must be set");
     }
 
+    @Override
     public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
      // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is
set.
       Unbounded<KafkaRecord<K, V>> unbounded =
@@ -458,6 +467,7 @@ public class KafkaIO {
     private static <KeyT, ValueT, OutT> SerializableFunction<KafkaRecord<KeyT,
ValueT>, OutT>
     unwrapKafkaAndThen(final SerializableFunction<KV<KeyT, ValueT>, OutT> fn)
{
       return new SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>() {
+        @Override
         public OutT apply(KafkaRecord<KeyT, ValueT> record) {
           return fn.apply(record.getKV());
         }
@@ -499,6 +509,7 @@ public class KafkaIO {
     private static final SerializableFunction<Map<String, Object>, Consumer<byte[],
byte[]>>
       KAFKA_CONSUMER_FACTORY_FN =
         new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>()
{
+          @Override
           public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
             return new KafkaConsumer<>(config);
           }
@@ -627,6 +638,7 @@ public class KafkaIO {
       }
 
       Collections.sort(partitions, new Comparator<TopicPartition>() {
+        @Override
         public int compare(TopicPartition tp1, TopicPartition tp2) {
           return ComparisonChain
               .start()
@@ -750,6 +762,7 @@ public class KafkaIO {
     /** watermark before any records have been read. */
     private static Instant initialWatermark = new Instant(Long.MIN_VALUE);
 
+    @Override
     public String toString() {
       return name;
     }
@@ -800,13 +813,14 @@ public class KafkaIO {
     public UnboundedKafkaReader(
         UnboundedKafkaSource<K, V> source,
         @Nullable KafkaCheckpointMark checkpointMark) {
-
+      this.consumerSpEL = new ConsumerSpEL();
       this.source = source;
       this.name = "Reader-" + source.id;
 
       List<TopicPartition> partitions = source.spec.getTopicPartitions();
       partitionStates = ImmutableList.copyOf(Lists.transform(partitions,
           new Function<TopicPartition, PartitionState>() {
+            @Override
             public PartitionState apply(TopicPartition tp) {
               return new PartitionState(tp, UNINITIALIZED_OFFSET);
             }
@@ -886,7 +900,6 @@ public class KafkaIO {
 
     @Override
     public boolean start() throws IOException {
-      this.consumerSpEL = new ConsumerSpEL();
       Read<K, V> spec = source.spec;
       consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig());
       consumerSpEL.evaluateAssign(consumer, spec.getTopicPartitions());
@@ -909,6 +922,7 @@ public class KafkaIO {
       // Note that consumer is not thread safe, should not be accessed out side consumerPollLoop().
       consumerPollThread.submit(
           new Runnable() {
+            @Override
             public void run() {
               consumerPollLoop();
             }
@@ -929,6 +943,7 @@ public class KafkaIO {
 
       offsetFetcherThread.scheduleAtFixedRate(
           new Runnable() {
+            @Override
             public void run() {
               updateLatestOffsets();
             }
@@ -986,6 +1001,7 @@ public class KafkaIO {
               rawRecord.topic(),
               rawRecord.partition(),
               rawRecord.offset(),
+              consumerSpEL.getRecordTimestamp(rawRecord),
               decode(rawRecord.key(), source.spec.getKeyCoder()),
               decode(rawRecord.value(), source.spec.getValueCoder()));
 
@@ -1059,6 +1075,7 @@ public class KafkaIO {
       return new KafkaCheckpointMark(ImmutableList.copyOf(// avoid lazy (consumedOffset can
change)
           Lists.transform(partitionStates,
               new Function<PartitionState, PartitionMark>() {
+                @Override
                 public PartitionMark apply(PartitionState p) {
                   return new PartitionMark(p.topicPartition.topic(),
                                            p.topicPartition.partition(),

http://git-wip-us.apache.org/repos/asf/beam/blob/f10509e7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
index fa202e1..e0e400e 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.kafka;
 
 import java.io.Serializable;
 import java.util.Arrays;
+
 import org.apache.beam.sdk.values.KV;
 
 /**
@@ -31,25 +32,28 @@ public class KafkaRecord<K, V> implements Serializable {
   private final int partition;
   private final long offset;
   private final KV<K, V> kv;
+  private final long timestamp;
 
   public KafkaRecord(
       String topic,
       int partition,
       long offset,
+      long timestamp,
       K key,
       V value) {
-    this(topic, partition, offset, KV.of(key, value));
+    this(topic, partition, offset, timestamp, KV.of(key, value));
   }
 
   public KafkaRecord(
       String topic,
       int partition,
       long offset,
+      long timestamp,
       KV<K, V> kv) {
-
     this.topic = topic;
     this.partition = partition;
     this.offset = offset;
+    this.timestamp = timestamp;
     this.kv = kv;
   }
 
@@ -69,9 +73,13 @@ public class KafkaRecord<K, V> implements Serializable {
     return kv;
   }
 
+  public long getTimestamp() {
+    return timestamp;
+  }
+
   @Override
   public int hashCode() {
-    return Arrays.deepHashCode(new Object[]{topic, partition, offset, kv});
+    return Arrays.deepHashCode(new Object[]{topic, partition, offset, timestamp, kv});
   }
 
   @Override
@@ -82,6 +90,7 @@ public class KafkaRecord<K, V> implements Serializable {
       return topic.equals(other.topic)
           && partition == other.partition
           && offset == other.offset
+          && timestamp == other.timestamp
           && kv.equals(other.kv);
     } else {
       return false;

http://git-wip-us.apache.org/repos/asf/beam/blob/f10509e7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
index ea78f09..2043a4c 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -19,10 +19,12 @@ package org.apache.beam.sdk.io.kafka;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
+
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -66,6 +68,7 @@ public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K,
V>> {
     stringCoder.encode(value.getTopic(), outStream, nested);
     intCoder.encode(value.getPartition(), outStream, nested);
     longCoder.encode(value.getOffset(), outStream, nested);
+    longCoder.encode(value.getTimestamp(), outStream, nested);
     kvCoder.encode(value.getKV(), outStream, context);
   }
 
@@ -77,6 +80,7 @@ public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K,
V>> {
         stringCoder.decode(inStream, nested),
         intCoder.decode(inStream, nested),
         longCoder.decode(inStream, nested),
+        longCoder.decode(inStream, nested),
         kvCoder.decode(inStream, context));
   }
 
@@ -106,6 +110,7 @@ public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K,
V>> {
           value.getTopic(),
           value.getPartition(),
           value.getOffset(),
+          value.getTimestamp(),
           (KV<Object, Object>) kvCoder.structuralValue(value.getKV()));
     }
   }


Mime
View raw message