From commits-return-9470-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Mon May 7 05:55:27 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 49D32180674 for ; Mon, 7 May 2018 05:55:27 +0200 (CEST) Received: (qmail 16036 invoked by uid 500); 7 May 2018 03:55:24 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 15857 invoked by uid 99); 7 May 2018 03:55:24 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 May 2018 03:55:24 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 690AF8091A; Mon, 7 May 2018 03:55:23 +0000 (UTC) Date: Mon, 07 May 2018 03:55:23 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: MINOR: Build and code sample updates for Kafka Streams DSL for Scala (#4949) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152566532284.8360.7486810305330669647@gitbox.apache.org> From: guozhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: 564311f5cdedd63fa03052368c321db5ce15dc4f X-Git-Newrev: 893e0445150614d3538654af0e25f78d87a717ba X-Git-Rev: 893e0445150614d3538654af0e25f78d87a717ba X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 893e044 MINOR: Build and code sample updates for Kafka Streams DSL for Scala (#4949) 893e044 is described below commit 893e0445150614d3538654af0e25f78d87a717ba Author: Sean Glover AuthorDate: Sun May 6 23:55:12 2018 -0400 MINOR: Build and code sample updates for Kafka Streams DSL for Scala (#4949) Several build and documentation updates were required after the merge of KAFKA-6670: Implement a Scala wrapper library for Kafka Streams. Encode Scala major version into streams-scala artifacts. To differentiate versions of the kafka-streams-scala artifact across Scala major versions it's required to encode the version into the artifact name before its published to a maven repository. This is accomplished by following a similar release process as kafka core, which encodes the Scala major version and then runs the build for each major version of Scala supported. This is considered standard practice when releasing Scala libraries, but is not handled for us automatically with th [...] After this change you can generate and install the kafka-streams-scala artifact into the local maven repository: $ ./gradlew -PscalaVersion=2.11 install $ ./gradlew -PscalaVersion=2.12 install Reviewers: Ismael Juma , Guozhang Wang --- build.gradle | 3 +-- docs/streams/developer-guide/dsl-api.html | 14 ++++++++------ docs/streams/index.html | 7 ++++--- ...reamToTableJoinScalaIntegrationTestImplicitSerdes.scala | 3 +-- .../org/apache/kafka/streams/scala/TopologyTest.scala | 3 +-- .../org/apache/kafka/streams/scala/WordCountTest.scala | 3 +-- 6 files changed, 16 insertions(+), 17 deletions(-) diff --git a/build.gradle b/build.gradle index d60ca8f..31026d9 100644 --- a/build.gradle +++ b/build.gradle @@ -981,7 +981,7 @@ project(':streams') { project(':streams:streams-scala') { println "Building project 'streams-scala' with Scala version ${versions.scala}" apply plugin: 'scala' - archivesBaseName = "kafka-streams-scala" + archivesBaseName = "kafka-streams-scala_${versions.baseScala}" dependencies { compile project(':streams') @@ -992,7 +992,6 @@ project(':streams:streams-scala') { testCompile project(':core').sourceSets.test.output testCompile project(':streams').sourceSets.test.output testCompile project(':clients').sourceSets.test.output - testCompile libs.scalaLogging testCompile libs.junit testCompile libs.scalatest diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html index ce60654..2b25072 100644 --- a/docs/streams/developer-guide/dsl-api.html +++ b/docs/streams/developer-guide/dsl-api.html @@ -3191,11 +3191,13 @@ import java.util.Properties import java.util.concurrent.TimeUnit import org.apache.kafka.streams.kstream.Materialized +import org.apache.kafka.streams.scala.ImplicitConversions._ +import org.apache.kafka.streams.scala._ +import org.apache.kafka.streams.scala.kstream._ import org.apache.kafka.streams.{KafkaStreams, StreamsConfig} object WordCountApplication extends App { import DefaultSerdes._ - import ImplicitConversions._ val config: Properties = { val p = new Properties() @@ -3204,9 +3206,9 @@ object WordCountApplication extends App { p } - val builder = new StreamsBuilder() - val textLines = builder.stream[String, String]("TextLinesTopic") - val wordCounts = textLines + val builder: StreamsBuilder = new StreamsBuilder + val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic") + val wordCounts: KTable[String, Long] = textLines .flatMapValues(textLine => textLine.toLowerCase.split("\\W+")) .groupBy((_, word) => word) .count(Materialized.as("counts-store")) @@ -3216,7 +3218,7 @@ object WordCountApplication extends App { streams.start() sys.ShutdownHookThread { - streams.close(10, TimeUnit.SECONDS) + streams.close(10, TimeUnit.SECONDS) } } @@ -3290,7 +3292,7 @@ val clicksPerRegion: KTable[String, Long] = // Join the stream against the table. .leftJoin(userRegionsTable, (clicks: UserClicks, region: String) => (if (region == null) "UNKNOWN" else region, clicks.clicks)) - // Change the stream from -> to -> + // Change the stream from <user> -> <region, clicks> to <region> -> <clicks> .map((_, regionWithClicks) => regionWithClicks) // Compute the total per region by summing the individual click counts per region. diff --git a/docs/streams/index.html b/docs/streams/index.html index 8992fc5..72e1323 100644 --- a/docs/streams/index.html +++ b/docs/streams/index.html @@ -255,12 +255,13 @@ import java.util.Properties import java.util.concurrent.TimeUnit import org.apache.kafka.streams.kstream.Materialized +import org.apache.kafka.streams.scala.ImplicitConversions._ +import org.apache.kafka.streams.scala._ import org.apache.kafka.streams.scala.kstream._ import org.apache.kafka.streams.{KafkaStreams, StreamsConfig} object WordCountApplication extends App { import DefaultSerdes._ - import ImplicitConversions._ val config: Properties = { val p = new Properties() @@ -269,7 +270,7 @@ object WordCountApplication extends App { p } - val builder: StreamsBuilder = new StreamsBuilder() + val builder: StreamsBuilder = new StreamsBuilder val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic") val wordCounts: KTable[String, Long] = textLines .flatMapValues(textLine => textLine.toLowerCase.split("\\W+")) @@ -281,7 +282,7 @@ object WordCountApplication extends App { streams.start() sys.ShutdownHookThread { - streams.close(10, TimeUnit.SECONDS) + streams.close(10, TimeUnit.SECONDS) } } diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala index 24974c4..e701431 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala @@ -34,7 +34,6 @@ import org.apache.kafka.streams._ import org.apache.kafka.streams.scala.kstream._ import ImplicitConversions._ -import com.typesafe.scalalogging.LazyLogging /** * Test suite that does an example to demonstrate stream-table joins in Kafka Streams @@ -46,7 +45,7 @@ import com.typesafe.scalalogging.LazyLogging * Hence the native Java API based version is more verbose. */ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite - with StreamToTableJoinTestData with LazyLogging { + with StreamToTableJoinTestData { private val privateCluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1) diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala index 89b2935..71d4834 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala @@ -31,7 +31,6 @@ import org.apache.kafka.streams.scala.kstream._ import org.apache.kafka.common.serialization._ import ImplicitConversions._ -import com.typesafe.scalalogging.LazyLogging import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder => StreamsBuilderJ, _} import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream => KStreamJ, KGroupedStream => KGroupedStreamJ, _} @@ -40,7 +39,7 @@ import collection.JavaConverters._ /** * Test suite that verifies that the topology built by the Java and Scala APIs match. */ -class TopologyTest extends JUnitSuite with LazyLogging { +class TopologyTest extends JUnitSuite { val inputTopic = "input-topic" val userClicksTopic = "user-clicks-topic" diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala index f71e0cb..e827a3c 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala @@ -40,7 +40,6 @@ import org.apache.kafka.common.utils.MockTime import org.apache.kafka.test.TestUtils import ImplicitConversions._ -import com.typesafe.scalalogging.LazyLogging /** * Test suite that does a classic word count example. @@ -51,7 +50,7 @@ import com.typesafe.scalalogging.LazyLogging * Note: In the current project settings SAM type conversion is turned off as it's experimental in Scala 2.11. * Hence the native Java API based version is more verbose. */ -class WordCountTest extends JUnitSuite with WordCountTestData with LazyLogging { +class WordCountTest extends JUnitSuite with WordCountTestData { private val privateCluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1) -- To stop receiving notification emails like this one, please contact guozhang@apache.org.