From commits-return-10168-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Thu Aug 9 19:13:12 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 38CD418065B for ; Thu, 9 Aug 2018 19:13:12 +0200 (CEST) Received: (qmail 38606 invoked by uid 500); 9 Aug 2018 17:12:58 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 38572 invoked by uid 99); 9 Aug 2018 17:12:58 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Aug 2018 17:12:58 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id A33A6850E2; Thu, 9 Aug 2018 17:12:54 +0000 (UTC) Date: Thu, 09 Aug 2018 17:12:54 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 2.0 updated: KAFKA-7250: switch scala transform to TransformSupplier (#5481) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <153383477356.24568.7211616376446632076@gitbox.apache.org> From: guozhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/2.0 X-Git-Reftype: branch X-Git-Oldrev: 0cb3393043688058bccf1b979701b1469661d395 X-Git-Newrev: 1f7c4e9a55878deb8d790d5442b6f15b06372cf1 X-Git-Rev: 1f7c4e9a55878deb8d790d5442b6f15b06372cf1 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.0 by this push: new 1f7c4e9 KAFKA-7250: switch scala transform to TransformSupplier (#5481) 1f7c4e9 is described below commit 1f7c4e9a55878deb8d790d5442b6f15b06372cf1 Author: John Roesler AuthorDate: Thu Aug 9 12:11:48 2018 -0500 KAFKA-7250: switch scala transform to TransformSupplier (#5481) #5468 introduced a breaking API change that was actually avoidable. This PR re-introduces the old API as deprecated and alters the API introduced by #5468 to be consistent with the other methods also, fixed misc syntax problems --- build.gradle | 1 + .../kafka/streams/scala/kstream/KStream.scala | 30 ++++++++++++---------- .../apache/kafka/streams/scala/TopologyTest.scala | 28 ++++++++++---------- 3 files changed, 31 insertions(+), 28 deletions(-) diff --git a/build.gradle b/build.gradle index 6d7c325..ea2e9de 100644 --- a/build.gradle +++ b/build.gradle @@ -1011,6 +1011,7 @@ project(':streams:streams-scala') { testCompile libs.junit testCompile libs.scalatest + testCompile libs.easymock testRuntime libs.slf4jlog4j } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index a8766bd..adc1850 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -22,7 +22,7 @@ package kstream import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _} -import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier, TopicNameExtractor} +import org.apache.kafka.streams.processor.{Processor, ProcessorContext, ProcessorSupplier, TopicNameExtractor} import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.FunctionConversions._ @@ -31,8 +31,8 @@ import scala.collection.JavaConverters._ /** * Wraps the Java class [[org.apache.kafka.streams.kstream.KStream]] and delegates method calls to the underlying Java object. * - * @param [K] Type of keys - * @param [V] Type of values + * @tparam K Type of keys + * @tparam V Type of values * @param inner The underlying Java abstraction for KStream * * @see `org.apache.kafka.streams.kstream.KStream` @@ -167,7 +167,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { def print(printed: Printed[K, V]): Unit = inner.print(printed) /** - * Perform an action on each record of 'KStream` + * Perform an action on each record of `KStream` * * @param action an action to perform on each record * @see `org.apache.kafka.streams.kstream.KStream#foreach` @@ -176,14 +176,15 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { inner.foreach((k: K, v: V) => action(k, v)) /** - * Creates an array of {@code KStream} from this stream by branching the records in the original stream based on + * Creates an array of `KStream` from this stream by branching the records in the original stream based on * the supplied predicates. * * @param predicates the ordered list of functions that return a Boolean * @return multiple distinct substreams of this [[KStream]] * @see `org.apache.kafka.streams.kstream.KStream#branch` */ - def branch(predicates: (K, V) => Boolean*): Array[KStream[K, V]] = + //noinspection ScalaUnnecessaryParentheses + def branch(predicates: ((K, V) => Boolean)*): Array[KStream[K, V]] = inner.branch(predicates.map(_.asPredicate): _*).map(kstream => wrapKStream(kstream)) /** @@ -211,7 +212,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * }}} * * @param topic the topic name - * @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner` + * @param produced the instance of Produced that gives the serdes and `StreamPartitioner` * @return a [[KStream]] that contains the exact same (and potentially repartitioned) records as this [[KStream]] * @see `org.apache.kafka.streams.kstream.KStream#through` */ @@ -243,7 +244,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * }}} * * @param topic the topic name - * @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner` + * @param produced the instance of Produced that gives the serdes and `StreamPartitioner` * @see `org.apache.kafka.streams.kstream.KStream#to` */ def to(topic: String)(implicit produced: Produced[K, V]): Unit = @@ -275,7 +276,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * }}} * * @param extractor the extractor to determine the name of the Kafka topic to write to for reach record - * @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner` + * @param produced the instance of Produced that gives the serdes and `StreamPartitioner` * @see `org.apache.kafka.streams.kstream.KStream#to` */ def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K, V]): Unit = @@ -295,9 +296,9 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transform` */ - def transform[K1, V1](transformerSupplier: () => Transformer[K, V, KeyValue[K1, V1]], + def transform[K1, V1](transformerSupplier: TransformerSupplier[K, V, KeyValue[K1, V1]], stateStoreNames: String*): KStream[K1, V1] = - inner.transform(transformerSupplier.asTransformerSupplier, stateStoreNames: _*) + inner.transform(transformerSupplier, stateStoreNames: _*) /** * Transform the value of each input record into a new value (with possible new type) of the output record. @@ -337,11 +338,12 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * In order to assign a state, the state must be created and registered * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` * - * @param processorSupplier a function that generates a [[org.apache.kafka.stream.Processor]] + * @param processorSupplier a function that generates a [[org.apache.kafka.streams.processor.Processor]] * @param stateStoreNames the names of the state store used by the processor * @see `org.apache.kafka.streams.kstream.KStream#process` */ def process(processorSupplier: () => Processor[K, V], stateStoreNames: String*): Unit = { + //noinspection ConvertExpressionToSAM // because of the 2.11 build val processorSupplierJ: ProcessorSupplier[K, V] = new ProcessorSupplier[K, V] { override def get(): Processor[K, V] = processorSupplier() } @@ -374,7 +376,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * // to the groupByKey call * }}} * - * @param (implicit) serialized the instance of Serialized that gives the serdes + * @param serialized the instance of Serialized that gives the serdes * @return a [[KGroupedStream]] that contains the grouped records of the original [[KStream]] * @see `org.apache.kafka.streams.kstream.KStream#groupByKey` */ @@ -564,7 +566,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { def merge(stream: KStream[K, V]): KStream[K, V] = inner.merge(stream.inner) /** - * Perform an action on each record of {@code KStream}. + * Perform an action on each record of `KStream`. *

* Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection) * and returns an unchanged stream. diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala index 8a0eabb..b596dd3 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala @@ -21,19 +21,16 @@ package org.apache.kafka.streams.scala import java.util.regex.Pattern -import org.scalatest.junit.JUnitSuite -import org.junit.Assert._ -import org.junit._ - +import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, KStream => KStreamJ, KTable => KTableJ, _} +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.kstream._ - -import ImplicitConversions._ - import org.apache.kafka.streams.{StreamsBuilder => StreamsBuilderJ, _} -import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream => KStreamJ, KGroupedStream => KGroupedStreamJ, _} -import org.apache.kafka.streams.processor.ProcessorContext +import org.junit.Assert._ +import org.junit._ +import org.scalatest.junit.JUnitSuite -import collection.JavaConverters._ +import _root_.scala.collection.JavaConverters._ /** * Test suite that verifies that the topology built by the Java and Scala APIs match. @@ -207,17 +204,20 @@ class TopologyTest extends JUnitSuite { val streamBuilder = new StreamsBuilder val textLines = streamBuilder.stream[String, String](inputTopic) + //noinspection ConvertExpressionToSAM due to 2.11 build val _: KTable[String, Long] = textLines - .transform( - () => + .transform(new TransformerSupplier[String, String, KeyValue[String, String]] { + override def get(): Transformer[String, String, KeyValue[String, String]] = new Transformer[String, String, KeyValue[String, String]] { override def init(context: ProcessorContext): Unit = Unit + override def transform(key: String, value: String): KeyValue[String, String] = new KeyValue(key, value.toLowerCase) + override def close(): Unit = Unit - } - ) + } + }) .groupBy((k, v) => v) .count()