kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Put state args in correct order named repartition test (#6114)
Date Mon, 14 Jan 2019 00:57:18 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eb61410  MINOR: Put state args in correct order named repartition test (#6114)
eb61410 is described below

commit eb61410a002e95a623425cc89348216a8da56ffe
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Sun Jan 13 19:57:08 2019 -0500

    MINOR: Put state args in correct order named repartition test (#6114)
    
    Another system test that needs to be updated with states in the correct order
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
index f836baa..911716f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.tests;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KafkaStreams.State;
@@ -31,6 +32,7 @@ import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.time.Duration;
 import java.util.Objects;
@@ -76,7 +78,7 @@ public class StreamsNamedRepartitionTest {
         }
 
         maybeUpdatedStream.groupByKey(Grouped.with("grouped-stream", Serdes.String(), Serdes.String()))
-            .aggregate(initializer, aggregator, Materialized.with(Serdes.String(), Serdes.Integer()))
+            .aggregate(initializer, aggregator, Materialized.<String, Integer, KeyValueStore<Bytes,
byte[]>>as("count-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer()))
             .toStream()
             .peek((k, v) -> System.out.println(String.format("AGGREGATED key=%s value=%s",
k, v)))
             .to(aggregationTopic, Produced.with(Serdes.String(), Serdes.Integer()));
@@ -95,7 +97,7 @@ public class StreamsNamedRepartitionTest {
         final KafkaStreams streams = new KafkaStreams(topology, config);
 
 
-        streams.setStateListener((oldState, newState) -> {
+        streams.setStateListener((newState, oldState) -> {
             if (oldState == State.REBALANCING && newState == State.RUNNING) {
                 if (addOperators) {
                     System.out.println("UPDATED Topology");


Mime
View raw message