kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Make Serdes less confusing in Scala (#4963)
Date Tue, 08 May 2018 16:15:39 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b88d70b  MINOR: Make Serdes less confusing in Scala (#4963)
b88d70b is described below

commit b88d70b53290af715034a1f772a271f7e44505fd
Author: Joan Goyeau <joan@goyeau.com>
AuthorDate: Tue May 8 17:15:31 2018 +0100

    MINOR: Make Serdes less confusing in Scala (#4963)
    
    Serdes are confusing in the Scala wrapper:
    
    * We have wrappers around Serializer, Deserializer and Serde which are not very useful.
    * We have Serdes in 2 places org.apache.kafka.common.serialization.Serde and in DefaultSerdes,
instead we should be having only one place where to find all the Serdes.
    
    I wanted to do this PR before the release as this is a breaking change.
    This shouldn't add more so the current tests should be enough.
    
    Reviewers: Debasish Ghosh <dghosh@acm.org>, Guozhang Wang <guozhang@confluent.io>
---
 docs/streams/developer-guide/dsl-api.html          | 11 ++--
 docs/streams/index.html                            |  2 +-
 .../apache/kafka/streams/scala/DefaultSerdes.scala | 47 --------------
 .../apache/kafka/streams/scala/ScalaSerde.scala    | 70 ---------------------
 .../org/apache/kafka/streams/scala/Serdes.scala    | 71 ++++++++++++++++++++++
 .../kafka/streams/scala/StreamsBuilder.scala       |  4 +-
 .../kafka/streams/scala/kstream/KStream.scala      | 12 ++--
 ...bleJoinScalaIntegrationTestImplicitSerdes.scala | 16 ++---
 .../apache/kafka/streams/scala/TopologyTest.scala  | 14 ++---
 .../apache/kafka/streams/scala/WordCountTest.scala | 10 +--
 10 files changed, 105 insertions(+), 152 deletions(-)

diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index 2b25072..687dff9 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -3165,8 +3165,7 @@ t=5 (blue), which lead to a merge of sessions and an extension of a
session, res
             <p>The library also has several utility abstractions and modules that the
user needs to use for proper semantics.</p>
             <ul>
               <li><code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.ImplicitConversions</span></code>:
Module that brings into scope the implicit conversions between the Scala and Java classes.</li>
-              <li><code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.DefaultSerdes</span></code>:
Module that brings into scope the implicit values of all primitive SerDes.</li>
-              <li><code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.ScalaSerde</span></code>:
Base abstraction that can be used to implement custom SerDes in a type safe way.</li>
+              <li><code class="docutils literal"><span class="pre">org.apache.kafka.streams.scala.Serdes</span></code>:
Module that contains all primitive SerDes that can be imported as implicits and a helper to
create custom SerDes.</li>
             </ul>
             <p>The library is cross-built with Scala 2.11 and 2.12.  To reference the
library compiled against Scala 2.11 include the following in your maven <code>pom.xml</code>
add the following:</p>
             <pre class="brush: xml;">
@@ -3197,7 +3196,7 @@ import org.apache.kafka.streams.scala.kstream._
 import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
 
 object WordCountApplication extends App {
-  import DefaultSerdes._
+  import Serdes._
 
   val config: Properties = {
     val p = new Properties()
@@ -3235,7 +3234,7 @@ object WordCountApplication extends App {
 // that will set up all Serialized, Produced, Consumed and Joined instances.
 // So all APIs below that accept Serialized, Produced, Consumed or Joined will
 // get these instances automatically
-import DefaultSerdes._
+import Serdes._
 
 val builder = new StreamsBuilder()
 
@@ -3260,7 +3259,7 @@ clicksPerRegion.toStream.to(outputTopic)
               <p>Quite a few things are going on in the above code snippet that may
warrant a few lines of elaboration:</p>
               <ol>
                 <li>The code snippet does not depend on any config defined SerDes.
In fact any SerDes defined as part of the config will be ignored.</li>
-                <li>All SerDes are picked up from the implicits in scope. And <code
class="docutils literal"><span class="pre">import DefaultSerdes._</span></code>
brings all necessary SerDes in scope.</li>
+                <li>All SerDes are picked up from the implicits in scope. And <code
class="docutils literal"><span class="pre">import Serdes._</span></code>
brings all necessary SerDes in scope.</li>
                 <li>This is an example of compile time type safety that we don't have
in the Java APIs.</li>
                 <li>The code looks less verbose and more focused towards the actual
transformation that it does on the data stream.</li>
               </ol>
@@ -3277,7 +3276,7 @@ case class UserClicks(clicks: Long)
 implicit val userClicksSerde: Serde[UserClicks] = new AvroSerde
 
 // Primitive SerDes
-import DefaultSerdes._
+import Serdes._
 
 // And then business as usual ..
 
diff --git a/docs/streams/index.html b/docs/streams/index.html
index 72e1323..6dfaf6b 100644
--- a/docs/streams/index.html
+++ b/docs/streams/index.html
@@ -261,7 +261,7 @@ import org.apache.kafka.streams.scala.kstream._
 import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
 
 object WordCountApplication extends App {
-  import DefaultSerdes._
+  import Serdes._
 
   val config: Properties = {
     val p = new Properties()
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/DefaultSerdes.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/DefaultSerdes.scala
deleted file mode 100644
index 3f2840e..0000000
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/DefaultSerdes.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
- * Copyright (C) 2017-2018 Alexis Seigneurin.
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.scala
-
-import java.nio.ByteBuffer
-
-import org.apache.kafka.common.serialization.{Serde, Serdes}
-import org.apache.kafka.common.utils.Bytes
-import org.apache.kafka.streams.kstream.WindowedSerdes
-
-
-/**
- * Implicit values for default serdes.
- * <p>
- * Bring them in scope for default serializers / de-serializers to work.
- */
-object DefaultSerdes {
-  implicit val stringSerde: Serde[String] = Serdes.String()
-  implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
-  implicit val byteArraySerde: Serde[Array[Byte]] = Serdes.ByteArray()
-  implicit val bytesSerde: Serde[Bytes] = Serdes.Bytes()
-  implicit val floatSerde: Serde[Float] = Serdes.Float().asInstanceOf[Serde[Float]]
-  implicit val doubleSerde: Serde[Double] = Serdes.Double().asInstanceOf[Serde[Double]]
-  implicit val integerSerde: Serde[Int] = Serdes.Integer().asInstanceOf[Serde[Int]]
-  implicit val shortSerde: Serde[Short] = Serdes.Short().asInstanceOf[Serde[Short]]
-  implicit val byteBufferSerde: Serde[ByteBuffer] = Serdes.ByteBuffer()
-
-  implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new WindowedSerdes.TimeWindowedSerde[T]()
-  implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] = new WindowedSerdes.SessionWindowedSerde[T]()
-}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ScalaSerde.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ScalaSerde.scala
deleted file mode 100644
index 06afcae..0000000
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ScalaSerde.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
- * Copyright (C) 2017-2018 Alexis Seigneurin.
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.scala
-
-import org.apache.kafka.common.serialization.{Serde, Deserializer => JDeserializer, Serializer
=> JSerializer}
-
-trait ScalaSerde[T] extends Serde[T] {
-  override def deserializer(): JDeserializer[T]
-
-  override def serializer(): JSerializer[T]
-
-  override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = ()
-
-  override def close(): Unit = ()
-}
-
-trait SimpleScalaSerde[T >: Null] extends Serde[T] with ScalaSerde[T] {
-  def serialize(data: T): Array[Byte]
-  def deserialize(data: Array[Byte]): Option[T]
-
-  private def outerSerialize(data: T): Array[Byte] = serialize(data)
-  private def outerDeserialize(data: Array[Byte]): Option[T] = deserialize(data)
-
-  override def deserializer(): Deserializer[T] = new Deserializer[T] {
-    override def deserialize(data: Array[Byte]): Option[T] = outerDeserialize(data)
-  }
-
-  override def serializer(): Serializer[T] = new Serializer[T] {
-    override def serialize(data: T): Array[Byte] = outerSerialize(data)
-  }
-}
-
-trait Deserializer[T >: Null] extends JDeserializer[T] {
-  override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = ()
-
-  override def close(): Unit = ()
-
-  override def deserialize(topic: String, data: Array[Byte]): T =
-    Option(data).flatMap(deserialize).orNull
-
-  def deserialize(data: Array[Byte]): Option[T]
-}
-
-trait Serializer[T] extends JSerializer[T] {
-  override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = ()
-
-  override def close(): Unit = ()
-
-  override def serialize(topic: String, data: T): Array[Byte] =
-    Option(data).map(serialize).orNull
-
-  def serialize(data: T): Array[Byte]
-}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
new file mode 100644
index 0000000..a0ffffa
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import java.util
+
+import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer, Serdes =>
JSerdes}
+import org.apache.kafka.streams.kstream.WindowedSerdes
+
+object Serdes {
+  implicit val String: Serde[String]                             = JSerdes.String()
+  implicit val Long: Serde[Long]                                 = JSerdes.Long().asInstanceOf[Serde[Long]]
+  implicit val JavaLong: Serde[java.lang.Long]                   = JSerdes.Long()
+  implicit val ByteArray: Serde[Array[Byte]]                     = JSerdes.ByteArray()
+  implicit val Bytes: Serde[org.apache.kafka.common.utils.Bytes] = JSerdes.Bytes()
+  implicit val Float: Serde[Float]                               = JSerdes.Float().asInstanceOf[Serde[Float]]
+  implicit val JavaFloat: Serde[java.lang.Float]                 = JSerdes.Float()
+  implicit val Double: Serde[Double]                             = JSerdes.Double().asInstanceOf[Serde[Double]]
+  implicit val JavaDouble: Serde[java.lang.Double]               = JSerdes.Double()
+  implicit val Integer: Serde[Int]                               = JSerdes.Integer().asInstanceOf[Serde[Int]]
+  implicit val JavaInteger: Serde[java.lang.Integer]             = JSerdes.Integer()
+
+  implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new WindowedSerdes.TimeWindowedSerde[T]()
+  implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] = new WindowedSerdes.SessionWindowedSerde[T]()
+
+  def fromFn[T >: Null](serializer: T => Array[Byte], deserializer: Array[Byte] =>
Option[T]): Serde[T] =
+    JSerdes.serdeFrom(
+      new Serializer[T] {
+        override def serialize(topic: String, data: T): Array[Byte]                = serializer(data)
+        override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
+        override def close(): Unit                                                 = ()
+      },
+      new Deserializer[T] {
+        override def deserialize(topic: String, data: Array[Byte]): T              = deserializer(data).orNull
+        override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
+        override def close(): Unit                                                 = ()
+      }
+    )
+
+  def fromFn[T >: Null](serializer: (String, T) => Array[Byte],
+    deserializer: (String, Array[Byte]) => Option[T]): Serde[T] =
+    JSerdes.serdeFrom(
+      new Serializer[T] {
+        override def serialize(topic: String, data: T): Array[Byte]                = serializer(topic,
data)
+        override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
+        override def close(): Unit                                                 = ()
+      },
+      new Deserializer[T] {
+        override def deserialize(topic: String, data: Array[Byte]): T              = deserializer(topic,
data).orNull
+        override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
+        override def close(): Unit                                                 = ()
+      }
+    )
+}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index 9e6e204..397af32 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -48,7 +48,7 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
    * import ImplicitConversions._
    *
    * // Bring implicit default serdes in scope
-   * import DefaultSerdes._
+   * import Serdes._
    *
    * val builder = new StreamsBuilder()
    *
@@ -98,7 +98,7 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
    * import ImplicitConversions._
    *
    * // Bring implicit default serdes in scope
-   * import DefaultSerdes._
+   * import Serdes._
    *
    * val builder = new StreamsBuilder()
    *
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 7634b95..e3e8470 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -206,7 +206,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * Example:
    *
    * // brings implicit serdes in scope
-   * import DefaultSerdes._
+   * import Serdes._
    *
    * //..
    * val clicksPerRegion: KTable[String, Long] = //..
@@ -238,7 +238,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * Example:
    *
    * // brings implicit serdes in scope
-   * import DefaultSerdes._
+   * import Serdes._
    *
    * //..
    * val clicksPerRegion: KTable[String, Long] = //..
@@ -354,7 +354,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * Example:
    *
    * // brings implicit serdes in scope
-   * import DefaultSerdes._
+   * import Serdes._
    *
    * val clicksPerRegion: KTable[String, Long] =
    *   userClicksStream
@@ -362,7 +362,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    *     .map((_, regionWithClicks) => regionWithClicks)
    *
    *     // the groupByKey gets the Serialized instance through an implicit conversion of
the
-   *     // serdes brought into scope through the import DefaultSerdes._ above
+   *     // serdes brought into scope through the import Serdes._ above
    *     .groupByKey
    *     .reduce(_ + _)
    *
@@ -388,7 +388,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * Example:
    *
    * // brings implicit serdes in scope
-   * import DefaultSerdes._
+   * import Serdes._
    *
    * val textLines = streamBuilder.stream[String, String](inputTopic)
    *
@@ -398,7 +398,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    *   textLines.flatMapValues(v => pattern.split(v.toLowerCase))
    *
    *     // the groupBy gets the Serialized instance through an implicit conversion of the
-   *     // serdes brought into scope through the import DefaultSerdes._ above
+   *     // serdes brought into scope through the import Serdes._ above
    *     .groupBy((k, v) => v)
    *
    *     .count()
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index e701431..113458e 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -73,7 +73,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
     // DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will
set up all Serialized, Produced, 
     // Consumed and Joined instances. So all APIs below that accept Serialized, Produced,
Consumed or Joined will
     // get these instances automatically
-    import DefaultSerdes._
+    import Serdes._
 
     val streamsConfiguration: Properties = getStreamsConfiguration()
 
@@ -122,16 +122,16 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
 
     val streamsConfiguration: Properties = getStreamsConfiguration()
 
-    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
-    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
+    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
+    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
 
     val builder: StreamsBuilderJ = new StreamsBuilderJ()
 
     val userClicksStream: KStreamJ[String, JLong] = 
-      builder.stream[String, JLong](userClicksTopicJ, Consumed.`with`(Serdes.String(), Serdes.Long()))
+      builder.stream[String, JLong](userClicksTopicJ, Consumed.`with`(Serdes.String, Serdes.JavaLong))
 
     val userRegionsTable: KTableJ[String, String] = 
-      builder.table[String, String](userRegionsTopicJ, Consumed.`with`(Serdes.String(), Serdes.String()))
+      builder.table[String, String](userRegionsTopicJ, Consumed.`with`(Serdes.String, Serdes.String))
 
     // Join the stream against the table.
     val userClicksJoinRegion: KStreamJ[String, (String, JLong)] = userClicksStream
@@ -140,7 +140,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
           def apply(clicks: JLong, region: String): (String, JLong) = 
             (if (region == null) "UNKNOWN" else region, clicks)
         }, 
-        Joined.`with`[String, JLong, String](Serdes.String(), Serdes.Long(), Serdes.String()))

+        Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong, Serdes.String))
 
     // Change the stream from <user> -> <region, clicks> to <region>
-> <clicks>
     val clicksByRegion : KStreamJ[String, JLong] = userClicksJoinRegion
@@ -152,7 +152,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
         
     // Compute the total per region by summing the individual click counts per region.
     val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
-      .groupByKey(Serialized.`with`(Serdes.String(), Serdes.Long()))
+      .groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong))
       .reduce {
         new Reducer[JLong] {
           def apply(v1: JLong, v2: JLong) = v1 + v2
@@ -160,7 +160,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
       }
         
     // Write the (continuously updating) results to the output topic.
-    clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(Serdes.String(), Serdes.Long()))
+    clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(Serdes.String, Serdes.JavaLong))
 
     val streams: KafkaStreamsJ = new KafkaStreamsJ(builder.build(), streamsConfiguration)
 
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 71d4834..9495ea7 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -52,7 +52,7 @@ class TopologyTest extends JUnitSuite {
     // build the Scala topology
     def getTopologyScala(): TopologyDescription = {
 
-      import DefaultSerdes._
+      import Serdes._
       import collection.JavaConverters._
   
       val streamBuilder = new StreamsBuilder
@@ -87,7 +87,7 @@ class TopologyTest extends JUnitSuite {
     // build the Scala topology
     def getTopologyScala(): TopologyDescription = {
 
-      import DefaultSerdes._
+      import Serdes._
       import collection.JavaConverters._
   
       val streamBuilder = new StreamsBuilder
@@ -132,7 +132,7 @@ class TopologyTest extends JUnitSuite {
 
     // build the Scala topology
     def getTopologyScala(): TopologyDescription = {
-      import DefaultSerdes._
+      import Serdes._
   
       val builder = new StreamsBuilder()
   
@@ -158,10 +158,10 @@ class TopologyTest extends JUnitSuite {
       val builder: StreamsBuilderJ = new StreamsBuilderJ()
   
       val userClicksStream: KStreamJ[String, JLong] = 
-        builder.stream[String, JLong](userClicksTopic, Consumed.`with`(Serdes.String(), Serdes.Long()))
+        builder.stream[String, JLong](userClicksTopic, Consumed.`with`(Serdes.String, Serdes.JavaLong))
   
       val userRegionsTable: KTableJ[String, String] = 
-        builder.table[String, String](userRegionsTopic, Consumed.`with`(Serdes.String(),
Serdes.String()))
+        builder.table[String, String](userRegionsTopic, Consumed.`with`(Serdes.String, Serdes.String))
   
       // Join the stream against the table.
       val userClicksJoinRegion: KStreamJ[String, (String, JLong)] = userClicksStream
@@ -170,7 +170,7 @@ class TopologyTest extends JUnitSuite {
             def apply(clicks: JLong, region: String): (String, JLong) = 
               (if (region == null) "UNKNOWN" else region, clicks)
           }, 
-          Joined.`with`[String, JLong, String](Serdes.String(), Serdes.Long(), Serdes.String()))

+          Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong, Serdes.String))
   
       // Change the stream from <user> -> <region, clicks> to <region>
-> <clicks>
       val clicksByRegion : KStreamJ[String, JLong] = userClicksJoinRegion
@@ -182,7 +182,7 @@ class TopologyTest extends JUnitSuite {
           
       // Compute the total per region by summing the individual click counts per region.
       val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
-        .groupByKey(Serialized.`with`(Serdes.String(), Serdes.Long()))
+        .groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong))
         .reduce {
           new Reducer[JLong] {
             def apply(v1: JLong, v2: JLong) = v1 + v2
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
index e827a3c..17fa35c 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
@@ -75,7 +75,7 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
 
   @Test def testShouldCountWords(): Unit = {
 
-    import DefaultSerdes._
+    import Serdes._
 
     val streamsConfiguration = getStreamsConfiguration()
 
@@ -112,8 +112,8 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
     import collection.JavaConverters._
 
     val streamsConfiguration = getStreamsConfiguration()
-    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
-    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
+    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
+    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
 
     val streamBuilder = new StreamsBuilderJ
     val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopicJ)
@@ -134,7 +134,7 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
 
     val wordCounts: KTableJ[String, java.lang.Long] = grouped.count()
 
-    wordCounts.toStream.to(outputTopicJ, Produced.`with`(Serdes.String(), Serdes.Long()))
+    wordCounts.toStream.to(outputTopicJ, Produced.`with`(Serdes.String, Serdes.JavaLong))
 
     val streams: KafkaStreamsJ = new KafkaStreamsJ(streamBuilder.build(), streamsConfiguration)
     streams.start()
@@ -153,7 +153,7 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
     streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
     streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000")
     streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath())
+    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot.getPath)
     streamsConfiguration
   }
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message