kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5844; add groupBy(selector, serialized) to Ktable
Date Thu, 07 Sep 2017 11:35:36 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9cbb9f093 -> 329d5fa64


KAFKA-5844; add groupBy(selector, serialized) to Ktable

add `KTable#groupBy(KeyValueMapper, Serialized)` and deprecate the overload with `Serde` params

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>,
Bill Bejeck <bill@confluent.io>

Closes #3802 from dguy/kip-182-ktable-groupby


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/329d5fa6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/329d5fa6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/329d5fa6

Branch: refs/heads/trunk
Commit: 329d5fa64a2a3ac1d39ac37fdacbf6e43d500d11
Parents: 9cbb9f0
Author: Damian Guy <damian.guy@gmail.com>
Authored: Thu Sep 7 12:35:31 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Thu Sep 7 12:35:31 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KTable.java    | 33 +++++++++++++++++++-
 .../kafka/streams/kstream/KeyValueMapper.java   |  4 +--
 .../streams/kstream/internals/KTableImpl.java   | 22 ++++++++-----
 .../kstream/internals/KTableAggregateTest.java  | 21 ++++++-------
 .../internals/KTableKTableLeftJoinTest.java     |  3 +-
 .../kafka/streams/tests/SmokeTestClient.java    |  3 +-
 6 files changed, 62 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/329d5fa6/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 06a0eee..4bc9572 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -1001,7 +1001,7 @@ public interface KTable<K, V> {
      * records to and rereading all update records from it, such that the resulting {@link
KGroupedTable} is partitioned
      * on the new key.
      * <p>
-     * If the key or value type is changed, it is recommended to use {@link #groupBy(KeyValueMapper,
Serde, Serde)}
+     * If the key or value type is changed, it is recommended to use {@link #groupBy(KeyValueMapper,
Serialized)}
      * instead.
      *
      * @param selector a {@link KeyValueMapper} that computes a new grouping key and value
to be aggregated
@@ -1012,6 +1012,35 @@ public interface KTable<K, V> {
     <KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K,
? super V, KeyValue<KR, VR>> selector);
 
     /**
+     * Re-groups the records of this {@code KTable} using the provided {@link KeyValueMapper}
+     * and {@link Serde}s as specified by {@link Serialized}.
+     * Each {@link KeyValue} pair of this {@code KTable} is mapped to a new {@link KeyValue}
pair by applying the
+     * provided {@link KeyValueMapper}.
+     * Re-grouping a {@code KTable} is required before an aggregation operator can be applied
to the data
+     * (cf. {@link KGroupedTable}).
+     * The {@link KeyValueMapper} selects a new key and value (with should both have unmodified
type).
+     * If the new record key is {@code null} the record will not be included in the resulting
{@link KGroupedTable}
+     * <p>
+     * Because a new key is selected, an internal repartitioning topic will be created in
Kafka.
+     * This topic will be named "${applicationId}-XXX-repartition", where "applicationId"
is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"XXX" is
+     * an internally generated name, and "-repartition" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     * <p>
+     * All data of this {@code KTable} will be redistributed through the repartitioning topic
by writing all update
+     * records to and rereading all update records from it, such that the resulting {@link
KGroupedTable} is partitioned
+     * on the new key.
+     *
+     * @param selector      a {@link KeyValueMapper} that computes a new grouping key and
value to be aggregated
+     * @param serialized    the {@link Serialized} instance used to specify {@link org.apache.kafka.common.serialization.Serdes}
+     * @param <KR>          the key type of the result {@link KGroupedTable}
+     * @param <VR>          the value type of the result {@link KGroupedTable}
+     * @return a {@link KGroupedTable} that contains the re-grouped records of the original
{@code KTable}
+     */
+    <KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K,
? super V, KeyValue<KR, VR>> selector,
+                                           final Serialized<KR, VR> serialized);
+
+    /**
      * Re-groups the records of this {@code KTable} using the provided {@link KeyValueMapper}.
      * Each {@link KeyValue} pair of this {@code KTable} is mapped to a new {@link KeyValue}
pair by applying the
      * provided {@link KeyValueMapper}.
@@ -1038,7 +1067,9 @@ public interface KTable<K, V> {
      * @param <KR>       the key type of the result {@link KGroupedTable}
      * @param <VR>       the value type of the result {@link KGroupedTable}
      * @return a {@link KGroupedTable} that contains the re-grouped records of the original
{@code KTable}
+     * @deprecated use {@link #groupBy(KeyValueMapper, Serialized)}
      */
+    @Deprecated
     <KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K,
? super V, KeyValue<KR, VR>> selector,
                                            final Serde<KR> keySerde,
                                            final Serde<VR> valueSerde);

http://git-wip-us.apache.org/repos/asf/kafka/blob/329d5fa6/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
index e707fbb..2a56a05 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
@@ -39,9 +39,9 @@ import org.apache.kafka.streams.KeyValue;
  * @see KStream#flatMap(KeyValueMapper)
  * @see KStream#selectKey(KeyValueMapper)
  * @see KStream#groupBy(KeyValueMapper)
- * @see KStream#groupBy(KeyValueMapper, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde)
+ * @see KStream#groupBy(KeyValueMapper, Serialized)
  * @see KTable#groupBy(KeyValueMapper)
- * @see KTable#groupBy(KeyValueMapper, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde)
+ * @see KTable#groupBy(KeyValueMapper, Serialized)
  * @see KTable#toStream(KeyValueMapper)
  */
 public interface KeyValueMapper<K, V, VR> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/329d5fa6/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 46f2636..aed7fde 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.PrintForeachAction;
+import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
@@ -609,21 +610,28 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
     public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super
K, ? super V, KeyValue<K1, V1>> selector,
                                                   final Serde<K1> keySerde,
                                                   final Serde<V1> valueSerde) {
+        return groupBy(selector, Serialized.with(keySerde, valueSerde));
+    }
+
+    @Override
+    public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super
K, ? super V, KeyValue<K1, V1>> selector) {
+        return this.groupBy(selector, Serialized.<K1, V1>with(null, null));
+    }
+
+    @Override
+    public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super
K, ? super V, KeyValue<K1, V1>> selector,
+                                                  final Serialized<K1, V1> serialized)
{
         Objects.requireNonNull(selector, "selector can't be null");
+        Objects.requireNonNull(serialized, "serialized can't be null");
         String selectName = builder.newName(SELECT_NAME);
 
-        KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new
KTableRepartitionMap<K, V, K1, V1>(this, selector);
+        KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new
KTableRepartitionMap<>(this, selector);
 
         // select the aggregate key and values (old and new), it would require parent to
send old values
         builder.internalTopologyBuilder.addProcessor(selectName, selectSupplier, this.name);
         this.enableSendingOldValues();
 
-        return new KGroupedTableImpl<>(builder, selectName, this.name, keySerde, valueSerde);
-    }
-
-    @Override
-    public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super
K, ? super V, KeyValue<K1, V1>> selector) {
-        return this.groupBy(selector, null, null);
+        return new KGroupedTableImpl<>(builder, selectName, this.name, serialized.keySerde(),
serialized.valueSerde());
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/kafka/blob/329d5fa6/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index f31e232..9347cc8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
@@ -49,6 +50,7 @@ import static org.junit.Assert.assertEquals;
 public class KTableAggregateTest {
 
     final private Serde<String> stringSerde = Serdes.String();
+    private final Serialized<String, String> stringSerialzied = Serialized.with(stringSerde,
stringSerde);
 
     private File stateDir = null;
 
@@ -70,8 +72,7 @@ public class KTableAggregateTest {
 
         KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1,
"anyStoreName");
         KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String,
String>NoOpKeyValueMapper(),
-                stringSerde,
-                stringSerde
+                                                       stringSerialzied
         ).aggregate(MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER,
                 MockAggregator.TOSTRING_REMOVER,
@@ -119,8 +120,7 @@ public class KTableAggregateTest {
 
         KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1,
"anyStoreName");
         KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String,
String>NoOpKeyValueMapper(),
-            stringSerde,
-            stringSerde
+                                                       stringSerialzied
         ).aggregate(MockInitializer.STRING_INIT,
             MockAggregator.TOSTRING_ADDER,
             MockAggregator.TOSTRING_REMOVER,
@@ -160,8 +160,7 @@ public class KTableAggregateTest {
                 }
                 }
             },
-                stringSerde,
-                stringSerde
+                stringSerialzied
         )
                 .aggregate(MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER,
@@ -234,7 +233,7 @@ public class KTableAggregateTest {
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
         builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
-                .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
stringSerde, stringSerde)
+                .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
stringSerialzied)
                 .count("count")
                 .toStream()
                 .process(proc);
@@ -249,7 +248,7 @@ public class KTableAggregateTest {
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
         builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
-            .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
stringSerde, stringSerde)
+            .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
stringSerialzied)
             .count()
             .toStream()
             .process(proc);
@@ -264,7 +263,7 @@ public class KTableAggregateTest {
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
         builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
-            .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
stringSerde, stringSerde)
+            .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
stringSerialzied)
             .count("count")
             .toStream()
             .process(proc);
@@ -299,7 +298,7 @@ public class KTableAggregateTest {
                     public KeyValue<String, String> apply(String key, String value)
{
                         return KeyValue.pair(String.valueOf(key.charAt(0)), String.valueOf(key.charAt(1)));
                     }
-                }, stringSerde, stringSerde)
+                }, stringSerialzied)
                 .aggregate(new Initializer<String>() {
 
                     @Override
@@ -358,7 +357,7 @@ public class KTableAggregateTest {
             public KeyValue<String, Long> apply(final Long key, final String value)
{
                 return new KeyValue<>(value, key);
             }
-        }, Serdes.String(), Serdes.Long())
+        }, Serialized.with(Serdes.String(), Serdes.Long()))
                 .reduce(new Reducer<Long>() {
                     @Override
                     public Long apply(final Long value1, final Long value2) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/329d5fa6/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index fe92f2b..781eb61 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -346,7 +347,7 @@ public class KTableKTableLeftJoinTest {
                     public KeyValue<Long, String> apply(final Long key, final String
value) {
                         return new KeyValue<>(key, value);
                     }
-                }, Serdes.Long(), Serdes.String()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_ADDER,
"agg-store");
+                }, Serialized.with(Serdes.Long(), Serdes.String())).reduce(MockReducer.STRING_ADDER,
MockReducer.STRING_ADDER, "agg-store");
 
         final KTable<Long, String> one = builder.table(Serdes.Long(), Serdes.String(),
tableOne, tableOne);
         final KTable<Long, String> two = builder.table(Serdes.Long(), Serdes.String(),
tableTwo, tableTwo);

http://git-wip-us.apache.org/repos/asf/kafka/blob/329d5fa6/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index fc7a915..50c33e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -219,8 +219,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         // test repartition
         Agg agg = new Agg();
         cntTable.groupBy(agg.selector(),
-                         stringSerde,
-                         longSerde
+                         Serialized.with(stringSerde, longSerde)
         ).aggregate(agg.init(),
                     agg.adder(),
                     agg.remover(),


Mime
View raw message