Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 97C9610C25 for ; Tue, 15 Oct 2013 05:28:08 +0000 (UTC) Received: (qmail 32577 invoked by uid 500); 15 Oct 2013 05:27:40 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 32391 invoked by uid 500); 15 Oct 2013 05:27:38 -0000 Mailing-List: contact commits-help@spark.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@spark.incubator.apache.org Delivered-To: mailing list commits@spark.incubator.apache.org Received: (qmail 31789 invoked by uid 99); 15 Oct 2013 05:27:27 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Oct 2013 05:27:27 +0000 X-ASF-Spam-Status: No, hits=-2000.5 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 15 Oct 2013 05:27:23 +0000 Received: (qmail 29854 invoked by uid 99); 15 Oct 2013 05:26:51 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Oct 2013 05:26:51 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 89DBB8B41EF; Tue, 15 Oct 2013 05:26:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pwendell@apache.org To: commits@spark.incubator.apache.org Date: Tue, 15 Oct 2013 05:27:16 -0000 Message-Id: <9b3d8fc2ba434cc3af5722be3fd983a7@git.apache.org> In-Reply-To: <51f8e685681d4de8837c333c7eb56be8@git.apache.org> References: <51f8e685681d4de8837c333c7eb56be8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [30/37] git commit: Making takeAsync and collectAsync deterministic. X-Virus-Checked: Checked by ClamAV on apache.org Making takeAsync and collectAsync deterministic. Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e2047d39 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e2047d39 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e2047d39 Branch: refs/heads/master Commit: e2047d3927e0032cc1d6de3fd09d00f96ce837d0 Parents: 09f7609 Author: Reynold Xin Authored: Fri Oct 11 13:04:45 2013 -0700 Committer: Reynold Xin Committed: Fri Oct 11 13:04:45 2013 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/FutureAction.scala | 4 ---- .../org/apache/spark/rdd/AsyncRDDActions.scala | 20 +++++++++++--------- .../apache/spark/rdd/AsyncRDDActionsSuite.scala | 10 ++++------ 3 files changed, 15 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2047d39/core/src/main/scala/org/apache/spark/FutureAction.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 9f41912..eab2957 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -177,10 +177,6 @@ class CancellablePromise[T] extends FutureAction[T] with Promise[T] { def run(func: => T)(implicit executor: ExecutionContext): Unit = scala.concurrent.future { thread = Thread.currentThread try { - if (cancelled) { - // This action has been cancelled before this thread even started running. - this.failure(new SparkException("action cancelled")) - } this.success(func) } catch { case e: Exception => this.failure(e) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2047d39/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index 5798324..32af795 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -54,9 +54,9 @@ class AsyncRDDActions[T: ClassManifest](self: RDD[T]) extends Serializable with * Return a future for retrieving all elements of this RDD. */ def collectAsync(): FutureAction[Seq[T]] = { - val results = new ArrayBuffer[T] + val results = new Array[Array[T]](self.partitions.size) self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.size), - (index, data) => results ++= data, results) + (index, data) => results(index) = data, results.flatten.toSeq) } /** @@ -66,10 +66,10 @@ class AsyncRDDActions[T: ClassManifest](self: RDD[T]) extends Serializable with val promise = new CancellablePromise[Seq[T]] promise.run { - val buf = new ArrayBuffer[T](num) + val results = new ArrayBuffer[T](num) val totalParts = self.partitions.length var partsScanned = 0 - while (buf.size < num && partsScanned < totalParts) { + while (results.size < num && partsScanned < totalParts) { // The number of partitions to try in this iteration. It is ok for this number to be // greater than totalParts because we actually cap it at totalParts in runJob. var numPartsToTry = 1 @@ -77,26 +77,28 @@ class AsyncRDDActions[T: ClassManifest](self: RDD[T]) extends Serializable with // If we didn't find any rows after the first iteration, just try all partitions next. // Otherwise, interpolate the number of partitions we need to try, but overestimate it // by 50%. - if (buf.size == 0) { + if (results.size == 0) { numPartsToTry = totalParts - 1 } else { - numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt + numPartsToTry = (1.5 * num * partsScanned / results.size).toInt } } numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions - val left = num - buf.size + val left = num - results.size val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) + val buf = new Array[Array[T]](p.size) promise.runJob(self, (it: Iterator[T]) => it.take(left).toArray, p, - (index: Int, data: Array[T]) => buf ++= data.take(num - buf.size), + (index: Int, data: Array[T]) => buf(index) = data, Unit) + buf.foreach(results ++= _.take(num - results.size)) partsScanned += numPartsToTry } - buf.toSeq + results.toSeq } promise.future http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2047d39/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index 131e246..3ef000d 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -53,8 +53,7 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll { test("collectAsync") { assert(zeroPartRdd.collectAsync().get() === Seq.empty) - // Note that we sort the collected output because the order is indeterministic. - val collected = sc.parallelize(1 to 1000, 3).collectAsync().get().sorted + val collected = sc.parallelize(1 to 1000, 3).collectAsync().get() assert(collected === (1 to 1000)) } @@ -80,10 +79,9 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll { test("takeAsync") { def testTake(rdd: RDD[Int], input: Seq[Int], num: Int) { - // Note that we sort the collected output because the order is indeterministic. - val expected = input.take(num).size - val saw = rdd.takeAsync(num).get().size - assert(saw == expected, "incorrect result for rdd with %d partitions (expected %d, saw %d)" + val expected = input.take(num) + val saw = rdd.takeAsync(num).get() + assert(saw == expected, "incorrect result for rdd with %d partitions (expected %s, saw %s)" .format(rdd.partitions.size, expected, saw)) } val input = Range(1, 1000)