kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: revert streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
Date Thu, 10 Nov 2016 15:17:24 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6972d9476 -> cc62b4f84


revert streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cc62b4f8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cc62b4f8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cc62b4f8

Branch: refs/heads/trunk
Commit: cc62b4f844ca16eee974e75b736af87b7532de0d
Parents: 6972d94
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Thu Nov 10 07:12:53 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Nov 10 07:12:53 2016 -0800

----------------------------------------------------------------------
 .../ConsumerRecordTimestampExtractor.java       | 52 +++++---------------
 1 file changed, 11 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cc62b4f8/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
index 57a45f3..0d3424e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
@@ -18,53 +18,23 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.errors.StreamsException;
 
 /**
- * Retrieves embedded metadata timestamps from Kafka messages.
- * Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message"
for the new
- * 0.10+ Kafka message format.
- * <p>
- * Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically
and
- * transparently embed such timestamps into message metadata they send to Kafka, which can
then be retrieved
+ * Retrieves built-in timestamps from Kafka messages (introduced in KIP-32: Add timestamps
to Kafka message).
+ *
+ * Here, "built-in" refers to the fact that compatible Kafka producer clients automatically
and
+ * transparently embed such timestamps into messages they sent to Kafka, which can then be
retrieved
  * via this timestamp extractor.
- * <p>
- * If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka
broker setting
- * {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}),
- * this extractor effectively provides <i>event-time</i> semantics.
- * If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded
metadata timestamps,
- * using this extractor effectively provides <i>ingestion-time</i> semantics.
- * <p>
- * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
- * <p>
- * If a record has a negative (invalid) timestamp value, this extractor raises an exception.
  *
- * @see RobustConsumerRecordTimestampExtractor
- * @see InferringConsumerRecordTimestampExtractor
- * @see WallclockTimestampExtractor
+ * If <i>CreateTime</i> is used to define the built-in timestamps, using this
extractor effectively provide
+ * <i>event-time</i> semantics. If <i>LogAppendTime</i> is used to
define the built-in timestamps, using
+ * this extractor effectively provides <i>ingestion-time</i> semantics.
+ *
+ * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
  */
 public class ConsumerRecordTimestampExtractor implements TimestampExtractor {
-
-    /**
-     * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}.
-     *
-     * @param record a data record
-     * @param currentStreamsTime the current value of the internally tracked Streams time
(could be -1 if unknown)
-     * @return the embedded metadata timestamp of the given {@link ConsumerRecord}
-     * @throws StreamsException if the embedded metadata timestamp is negative
-     */
     @Override
-    public long extract(final ConsumerRecord<Object, Object> record, final long currentStreamsTime)
{
-        final long timestamp = record.timestamp();
-
-        if (timestamp < 0) {
-            throw new StreamsException("Input record " + record + " has invalid (negative)
timestamp. " +
-                    "Possibly because a pre-0.10 producer client was used to write this record
to Kafka without embedding a timestamp, " +
-                    "or because the input topic was created before upgrading the Kafka cluster
to 0.10+. " +
-                    "Use a different TimestampExtractor to process this data.");
-        }
-
-        return timestamp;
+    public long extract(ConsumerRecord<Object, Object> record) {
+        return record.timestamp();
     }
-
 }


Mime
View raw message