From commits-return-31341-archive-asf-public=cust-asf.ponee.io@spark.apache.org Fri May 4 10:02:28 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 69222180634 for ; Fri, 4 May 2018 10:02:28 +0200 (CEST) Received: (qmail 71421 invoked by uid 500); 4 May 2018 08:02:27 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 71412 invoked by uid 99); 4 May 2018 08:02:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 May 2018 08:02:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C86E1F325A; Fri, 4 May 2018 08:02:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jshao@apache.org To: commits@spark.apache.org Message-Id: <5ac81b7e8f184ed68d2c956e4899f805@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-24136][SS] Fix MemoryStreamDataReader.next to skip sleeping if record is available Date: Fri, 4 May 2018 08:02:26 +0000 (UTC) Repository: spark Updated Branches: refs/heads/master 0c23e254c -> 7f1b6b182 [SPARK-24136][SS] Fix MemoryStreamDataReader.next to skip sleeping if record is available ## What changes were proposed in this pull request? Avoid unnecessary sleep (10 ms) in each invocation of MemoryStreamDataReader.next. ## How was this patch tested? Ran ContinuousSuite from IDE. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Arun Mahadevan Closes #21207 from arunmahadevan/memorystream. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f1b6b18 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f1b6b18 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f1b6b18 Branch: refs/heads/master Commit: 7f1b6b182e3cf3cbf29399e7bfbe03fa869e0bc8 Parents: 0c23e25 Author: Arun Mahadevan Authored: Fri May 4 16:02:21 2018 +0800 Committer: jerryshao Committed: Fri May 4 16:02:21 2018 +0800 ---------------------------------------------------------------------- .../streaming/sources/ContinuousMemoryStream.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7f1b6b18/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala index c28919b..a8fca3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala @@ -183,11 +183,10 @@ class ContinuousMemoryStreamDataReader( private var current: Option[Row] = None override def next(): Boolean = { - current = None + current = getRecord while (current.isEmpty) { Thread.sleep(10) - current = endpoint.askSync[Option[Row]]( - GetRecord(ContinuousMemoryStreamPartitionOffset(partition, currentOffset))) + current = getRecord } currentOffset += 1 true @@ -199,6 +198,10 @@ class ContinuousMemoryStreamDataReader( override def getOffset: ContinuousMemoryStreamPartitionOffset = ContinuousMemoryStreamPartitionOffset(partition, currentOffset) + + private def getRecord: Option[Row] = + endpoint.askSync[Option[Row]]( + GetRecord(ContinuousMemoryStreamPartitionOffset(partition, currentOffset))) } case class ContinuousMemoryStreamOffset(partitionNums: Map[Int, Int]) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org