spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject spark git commit: SPARK-4963 [SQL] Add copy to SQL's Sample operator
Date Sat, 10 Jan 2015 22:19:40 GMT
Repository: spark
Updated Branches:
  refs/heads/master b3e86dc62 -> 77106df69


SPARK-4963 [SQL] Add copy to SQL's Sample operator

https://issues.apache.org/jira/browse/SPARK-4963
SchemaRDD.sample() return wrong results due to GapSamplingIterator operating on mutable row.
HiveTableScan make RDD with SpecificMutableRow and SchemaRDD.sample() will return GapSamplingIterator
for iterating.

override def next(): T = {
    val r = data.next()
    advance
    r
  }

GapSamplingIterator.next() return the current underlying element and assigned it to r.
However if the underlying iterator is mutable row just like what HiveTableScan returned, underlying
iterator and r will point to the same object.
After advance operation, we drop some underlying elments and it also changed r which is not
expected. Then we return the wrong value different from initial r.

To fix this issue, the most direct way is to make HiveTableScan return mutable row with copy
just like the initial commit that I have made. This solution will make HiveTableScan can not
get the full advantage of reusable MutableRow, but it can make sample operation return correct
result.
Further more, we need to investigate  GapSamplingIterator.next() and make it can implement
copy operation inside it. To achieve this, we should define every elements that RDD can store
implement the function like cloneable and it will make huge change.

Author: Yanbo Liang <yanbohappy@gmail.com>

Closes #3827 from yanbohappy/spark-4963 and squashes the following commits:

0912ca0 [Yanbo Liang] code format keep
65c4e7c [Yanbo Liang] import file and clear annotation
55c7c56 [Yanbo Liang] better output of test case
cea7e2e [Yanbo Liang] SchemaRDD add copy operation before Sample operator
e840829 [Yanbo Liang] HiveTableScan return mutable row with copy


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77106df6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77106df6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77106df6

Branch: refs/heads/master
Commit: 77106df69147aba5eb1784adb84e2b574927c6de
Parents: b3e86dc
Author: Yanbo Liang <yanbohappy@gmail.com>
Authored: Sat Jan 10 14:16:37 2015 -0800
Committer: Michael Armbrust <michael@databricks.com>
Committed: Sat Jan 10 14:19:32 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/basicOperators.scala |  2 +-
 .../apache/spark/sql/hive/execution/SQLQuerySuite.scala | 12 ++++++++++++
 2 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/77106df6/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index e53723c..16ca4be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -70,7 +70,7 @@ case class Sample(fraction: Double, withReplacement: Boolean, seed: Long,
child:
   override def output = child.output
 
   // TODO: How to pick seed?
-  override def execute() = child.execute().sample(withReplacement, fraction, seed)
+  override def execute() = child.execute().map(_.copy()).sample(withReplacement, fraction,
seed)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/77106df6/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 5d0fb72..c1c3683 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.QueryTest
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.hive.test.TestHive._
+import org.apache.spark.util.Utils
 
 case class Nested1(f1: Nested2)
 case class Nested2(f2: Nested3)
@@ -202,4 +203,15 @@ class SQLQuerySuite extends QueryTest {
     checkAnswer(sql("SELECT sum( distinct key) FROM src group by key order by key"),
       sql("SELECT distinct key FROM src order by key").collect().toSeq)
   }
+
+  test("SPARK-4963 SchemaRDD sample on mutable row return wrong result") {
+    sql("SELECT * FROM src WHERE key % 2 = 0")
+      .sample(withReplacement = false, fraction = 0.3)
+      .registerTempTable("sampled")
+    (1 to 10).foreach { i =>
+      checkAnswer(
+        sql("SELECT * FROM sampled WHERE key % 2 = 1"),
+        Seq.empty[Row])
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message