kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 1.0 updated: MINOR: Add missing imports to 'Hello Kafka Streams' examples (#4535)
Date Thu, 08 Feb 2018 17:55:29 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new e4cfbe8  MINOR: Add missing imports to 'Hello Kafka Streams' examples (#4535)
e4cfbe8 is described below

commit e4cfbe8b2cf9b5568960b5eb1c578a4081a52390
Author: Daniel Wojda <wojdadan@gmail.com>
AuthorDate: Thu Feb 8 17:53:57 2018 +0000

    MINOR: Add missing imports to 'Hello Kafka Streams' examples (#4535)
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 docs/streams/index.html | 45 +++++++++++++++++++++++++--------------------
 1 file changed, 25 insertions(+), 20 deletions(-)

diff --git a/docs/streams/index.html b/docs/streams/index.html
index 90a39a3..b39a7d3 100644
--- a/docs/streams/index.html
+++ b/docs/streams/index.html
@@ -153,26 +153,28 @@
            <div class="code-example__snippet b-java-8 selected">
                <pre class="brush: java;">
                    import org.apache.kafka.common.serialization.Serdes;
+                   import org.apache.kafka.common.utils.Bytes;
                    import org.apache.kafka.streams.KafkaStreams;
                    import org.apache.kafka.streams.StreamsBuilder;
                    import org.apache.kafka.streams.StreamsConfig;
-                   import org.apache.kafka.streams.Topology;
+                   import org.apache.kafka.streams.kstream.KStream;
+                   import org.apache.kafka.streams.kstream.KTable;
                    import org.apache.kafka.streams.kstream.Materialized;
                    import org.apache.kafka.streams.kstream.Produced;
                    import org.apache.kafka.streams.state.KeyValueStore;
-       
+
                    import java.util.Arrays;
                    import java.util.Properties;
-       
+
                    public class WordCountApplication {
-       
+
                        public static void main(final String[] args) throws Exception {
                            Properties config = new Properties();
                            config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
                            config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
                            config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                            config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-       
+
                            StreamsBuilder builder = new StreamsBuilder();
                            KStream&lt;String, String&gt; textLines = builder.stream("TextLinesTopic");
                            KTable&lt;String, Long&gt; wordCounts = textLines
@@ -180,11 +182,11 @@
                                .groupBy((key, word) -> word)
                                .count(Materialized.&lt;String, Long, KeyValueStore&lt;Bytes,
byte[]&gt;&gt;as("counts-store"));
                            wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(),
Serdes.Long()));
-       
+
                            KafkaStreams streams = new KafkaStreams(builder.build(), config);
                            streams.start();
                        }
-       
+
                    }
                </pre>
            </div>
@@ -192,14 +194,16 @@
            <div class="code-example__snippet b-java-7">
                <pre class="brush: java;">
                    import org.apache.kafka.common.serialization.Serdes;
+                   import org.apache.kafka.common.utils.Bytes;
                    import org.apache.kafka.streams.KafkaStreams;
                    import org.apache.kafka.streams.StreamsBuilder;
                    import org.apache.kafka.streams.StreamsConfig;
-                   import org.apache.kafka.streams.Topology;
+                   import org.apache.kafka.streams.kstream.KStream;
+                   import org.apache.kafka.streams.kstream.KTable;
+                   import org.apache.kafka.streams.kstream.ValueMapper;
                    import org.apache.kafka.streams.kstream.KeyValueMapper;
                    import org.apache.kafka.streams.kstream.Materialized;
                    import org.apache.kafka.streams.kstream.Produced;
-                   import org.apache.kafka.streams.kstream.ValueMapper;
                    import org.apache.kafka.streams.state.KeyValueStore;
 
                    import java.util.Arrays;
@@ -247,16 +251,17 @@
                    import java.lang.Long
                    import java.util.Properties
                    import java.util.concurrent.TimeUnit
-       
+
                    import org.apache.kafka.common.serialization._
+                   import org.apache.kafka.common.utils.Bytes
                    import org.apache.kafka.streams._
-                   import org.apache.kafka.streams.kstream.{KeyValueMapper, Materialized,
Produced, ValueMapper}
-                   import org.apache.kafka.streams.state.KeyValueStore;
-       
+                   import org.apache.kafka.streams.kstream.{KStream, KTable, Materialized,
Produced}
+                   import org.apache.kafka.streams.state.KeyValueStore
+
                    import scala.collection.JavaConverters.asJavaIterableConverter
-       
+
                    object WordCountApplication {
-       
+
                        def main(args: Array[String]) {
                            val config: Properties = {
                                val p = new Properties()
@@ -266,23 +271,23 @@
                                p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
                                p
                            }
-       
+
                            val builder: StreamsBuilder = new StreamsBuilder()
                            val textLines: KStream[String, String] = builder.stream("TextLinesTopic")
                            val wordCounts: KTable[String, Long] = textLines
                                .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
                                .groupBy((_, word) => word)
                                .count(Materialized.as("counts-store").asInstanceOf[Materialized[String,
Long, KeyValueStore[Bytes, Array[Byte]]]])
-                           wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(),
Serdes.Long()))
-       
+                           wordCounts.toStream().to("WordsWithCountsTopic", Produced.`with`(Serdes.String(),
Serdes.Long()))
+
                            val streams: KafkaStreams = new KafkaStreams(builder.build(),
config)
                            streams.start()
-       
+
                            Runtime.getRuntime.addShutdownHook(new Thread(() => {
                                streams.close(10, TimeUnit.SECONDS)
                            }))
                        }
-       
+
                    }
                </pre>
            </div>

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message