kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: add upgrade guide for Kafka Streams API
Date Wed, 09 Nov 2016 23:31:22 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 fd2a9b4e2 -> 10cfc1628


MINOR: add upgrade guide for Kafka Streams API

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Michael G. Noll, Eno Thereska

Closes #2114 from mjsax/updateDocUpgradeSection

(cherry picked from commit 6972d9476fc036d5fdcc59fb82fb20f063091df8)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/10cfc162
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/10cfc162
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/10cfc162

Branch: refs/heads/0.10.1
Commit: 10cfc1628df024f7596d3af5c168fa90f59035ca
Parents: fd2a9b4
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Wed Nov 9 15:31:11 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Nov 9 15:31:19 2016 -0800

----------------------------------------------------------------------
 docs/upgrade.html                               | 37 ++++++++++++++
 .../ConsumerRecordTimestampExtractor.java       | 52 +++++++++++++++-----
 2 files changed, 78 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/10cfc162/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index e9fef1f..e6b9747 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -53,6 +53,43 @@ only support 0.10.1.x or later brokers while 0.10.1.x brokers also support
older
     <li> Due to the increased number of index files, on some brokers with large amount
the log segments (e.g. >15K), the log loading process during the broker startup could be
longer. Based on our experiment, setting the num.recovery.threads.per.data.dir to one may
reduce the log loading time. </li>
 </ul>
 
+<h5><a id="upgrade_1010_streams" href="#upgrade_1010_streams">Streams API changes
in 0.10.1.0</a></h5>
+<ul>
+    <li> Stream grouping and aggregation split into two methods:
+        <ul>
+            <li> old: KStream #aggregateByKey(), #reduceByKey(), and #countByKey()
</li>
+            <li> new: KStream#groupByKey() plus KGroupedStream #aggregate(), #reduce(),
and #count() </li>
+            <li> Example: stream.countByKey() changes to stream.groupByKey().count()
</li>
+        </ul>
+    </li>
+    <li> Auto Repartitioning:
+        <ul>
+            <li> a call to through() after a key-changing operator and before an aggregation/join
is no longer required </li>
+            <li> Example: stream.selectKey(...).through(...).countByKey() changes to
stream.selectKey().groupByKey().count() </li>
+        </ul>
+    </li>
+    <li> TopologyBuilder:
+        <ul>
+            <li> methods #sourceTopics(String applicationId) and #topicGroups(String
applicationId) got simplified to #sourceTopics() and #topicGroups() </li>
+        </ul>
+    </li>
+    <li> DSL: new parameter to specify state store names:
+        <ul>
+            <li> The new Interactive Queries feature requires to specify a store name
for all source KTables and window aggregation result KTables (previous parameter "operator/window
name" is now the storeName) </li>
+            <li> KStreamBuilder#table(String topic) changes to #topic(String topic,
String storeName) </li>
+            <li> KTable#through(String topic) changes to #through(String topic, String
storeName) </li>
+            <li> KGroupedStream #aggregate(), #reduce(), and #count() require additional
parameter "String storeName"</li>
+            <li> Example: stream.countByKey(TimeWindows.of("windowName", 1000)) changes
to stream.groupByKey().count(TimeWindows.of(1000), "countStoreName") </li>
+        </ul>
+    </li>
+    <li> Windowing:
+        <ul>
+            <li> Windows are not named anymore: TimeWindows.of("name", 1000) changes
to TimeWindows.of(1000) (cf. DSL: new parameter to specify state store names) </li>
+            <li> JoinWindows has no default size anymore: JoinWindows.of("name").within(1000)
changes to JoinWindows.of(1000) </li>
+        </ul>
+    </li>
+</ul>
+
 <h5><a id="upgrade_1010_notable" href="#upgrade_1010_notable">Notable changes
in 0.10.1.0</a></h5>
 <ul>
     <li> The new Java consumer is no longer in beta and we recommend it for all new
development. The old Scala consumers are still supported, but they will be deprecated in the
next release

http://git-wip-us.apache.org/repos/asf/kafka/blob/10cfc162/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
index 0d3424e..57a45f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
@@ -18,23 +18,53 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.errors.StreamsException;
 
 /**
- * Retrieves built-in timestamps from Kafka messages (introduced in KIP-32: Add timestamps
to Kafka message).
- *
- * Here, "built-in" refers to the fact that compatible Kafka producer clients automatically
and
- * transparently embed such timestamps into messages they sent to Kafka, which can then be
retrieved
+ * Retrieves embedded metadata timestamps from Kafka messages.
+ * Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message"
for the new
+ * 0.10+ Kafka message format.
+ * <p>
+ * Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically
and
+ * transparently embed such timestamps into message metadata they send to Kafka, which can
then be retrieved
  * via this timestamp extractor.
- *
- * If <i>CreateTime</i> is used to define the built-in timestamps, using this
extractor effectively provide
- * <i>event-time</i> semantics. If <i>LogAppendTime</i> is used to
define the built-in timestamps, using
- * this extractor effectively provides <i>ingestion-time</i> semantics.
- *
+ * <p>
+ * If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka
broker setting
+ * {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}),
+ * this extractor effectively provides <i>event-time</i> semantics.
+ * If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded
metadata timestamps,
+ * using this extractor effectively provides <i>ingestion-time</i> semantics.
+ * <p>
  * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
+ * <p>
+ * If a record has a negative (invalid) timestamp value, this extractor raises an exception.
+ *
+ * @see RobustConsumerRecordTimestampExtractor
+ * @see InferringConsumerRecordTimestampExtractor
+ * @see WallclockTimestampExtractor
  */
 public class ConsumerRecordTimestampExtractor implements TimestampExtractor {
+
+    /**
+     * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}.
+     *
+     * @param record a data record
+     * @param currentStreamsTime the current value of the internally tracked Streams time
(could be -1 if unknown)
+     * @return the embedded metadata timestamp of the given {@link ConsumerRecord}
+     * @throws StreamsException if the embedded metadata timestamp is negative
+     */
     @Override
-    public long extract(ConsumerRecord<Object, Object> record) {
-        return record.timestamp();
+    public long extract(final ConsumerRecord<Object, Object> record, final long currentStreamsTime)
{
+        final long timestamp = record.timestamp();
+
+        if (timestamp < 0) {
+            throw new StreamsException("Input record " + record + " has invalid (negative)
timestamp. " +
+                    "Possibly because a pre-0.10 producer client was used to write this record
to Kafka without embedding a timestamp, " +
+                    "or because the input topic was created before upgrading the Kafka cluster
to 0.10+. " +
+                    "Use a different TimestampExtractor to process this data.");
+        }
+
+        return timestamp;
     }
+
 }


Mime
View raw message