Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1BE0F200BD2 for ; Sat, 19 Nov 2016 01:34:53 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 1A908160B16; Sat, 19 Nov 2016 00:34:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3C347160B04 for ; Sat, 19 Nov 2016 01:34:52 +0100 (CET) Received: (qmail 14892 invoked by uid 500); 19 Nov 2016 00:34:51 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 14883 invoked by uid 99); 19 Nov 2016 00:34:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 19 Nov 2016 00:34:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5C025E0230; Sat, 19 Nov 2016 00:34:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tdas@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-18497][SS] Make ForeachSink support watermark Date: Sat, 19 Nov 2016 00:34:51 +0000 (UTC) archived-at: Sat, 19 Nov 2016 00:34:53 -0000 Repository: spark Updated Branches: refs/heads/branch-2.1 4b1df0e89 -> b4bad04c5 [SPARK-18497][SS] Make ForeachSink support watermark ## What changes were proposed in this pull request? The issue in ForeachSink is the new created DataSet still uses the old QueryExecution. When `foreachPartition` is called, `QueryExecution.toString` will be called and then fail because it doesn't know how to plan EventTimeWatermark. This PR just replaces the QueryExecution with IncrementalExecution to fix the issue. ## How was this patch tested? `test("foreach with watermark")`. Author: Shixiong Zhu Closes #15934 from zsxwing/SPARK-18497. (cherry picked from commit 2a40de408b5eb47edba92f9fe92a42ed1e78bf98) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4bad04c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4bad04c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4bad04c Branch: refs/heads/branch-2.1 Commit: b4bad04c5e20b06992100c1d44ece9d3a5b4f817 Parents: 4b1df0e Author: Shixiong Zhu Authored: Fri Nov 18 16:34:38 2016 -0800 Committer: Tathagata Das Committed: Fri Nov 18 16:34:48 2016 -0800 ---------------------------------------------------------------------- .../sql/execution/streaming/ForeachSink.scala | 16 ++++----- .../execution/streaming/ForeachSinkSuite.scala | 35 ++++++++++++++++++++ 2 files changed, 43 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b4bad04c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala index f5c550d..c93fcfb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala @@ -47,22 +47,22 @@ class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Seria // method supporting incremental planning. But in the long run, we should generally make newly // created Datasets use `IncrementalExecution` where necessary (which is SPARK-16264 tries to // resolve). - + val incrementalExecution = data.queryExecution.asInstanceOf[IncrementalExecution] val datasetWithIncrementalExecution = - new Dataset(data.sparkSession, data.logicalPlan, implicitly[Encoder[T]]) { + new Dataset(data.sparkSession, incrementalExecution, implicitly[Encoder[T]]) { override lazy val rdd: RDD[T] = { val objectType = exprEnc.deserializer.dataType val deserialized = CatalystSerde.deserialize[T](logicalPlan) // was originally: sparkSession.sessionState.executePlan(deserialized) ... - val incrementalExecution = new IncrementalExecution( + val newIncrementalExecution = new IncrementalExecution( this.sparkSession, deserialized, - data.queryExecution.asInstanceOf[IncrementalExecution].outputMode, - data.queryExecution.asInstanceOf[IncrementalExecution].checkpointLocation, - data.queryExecution.asInstanceOf[IncrementalExecution].currentBatchId, - data.queryExecution.asInstanceOf[IncrementalExecution].currentEventTimeWatermark) - incrementalExecution.toRdd.mapPartitions { rows => + incrementalExecution.outputMode, + incrementalExecution.checkpointLocation, + incrementalExecution.currentBatchId, + incrementalExecution.currentEventTimeWatermark) + newIncrementalExecution.toRdd.mapPartitions { rows => rows.map(_.get(0, objectType)) }.asInstanceOf[RDD[T]] } http://git-wip-us.apache.org/repos/asf/spark/blob/b4bad04c/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala index 9e05921..ee626103 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala @@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.SparkException import org.apache.spark.sql.ForeachWriter +import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, StreamTest} import org.apache.spark.sql.test.SharedSQLContext @@ -169,6 +170,40 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf assert(errorEvent.error.get.getMessage === "error") } } + + test("foreach with watermark") { + val inputData = MemoryStream[Int] + + val windowedAggregation = inputData.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .groupBy(window($"eventTime", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"count".as[Long]) + .map(_.toInt) + .repartition(1) + + val query = windowedAggregation + .writeStream + .outputMode(OutputMode.Complete) + .foreach(new TestForeachWriter()) + .start() + try { + inputData.addData(10, 11, 12) + query.processAllAvailable() + + val allEvents = ForeachSinkSuite.allEvents() + assert(allEvents.size === 1) + val expectedEvents = Seq( + ForeachSinkSuite.Open(partition = 0, version = 0), + ForeachSinkSuite.Process(value = 3), + ForeachSinkSuite.Close(None) + ) + assert(allEvents === Seq(expectedEvents)) + } finally { + query.stop() + } + } } /** A global object to collect events in the executor */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org