spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [SPARK-5860][CORE] JdbcRDD: overflow on large range with high number of partitions
Date Sat, 21 Feb 2015 20:40:26 GMT
Repository: spark
Updated Branches:
  refs/heads/master 7138816ab -> 7683982fa


[SPARK-5860][CORE] JdbcRDD: overflow on large range with high number of partitions

Fix a overflow bug in JdbcRDD when calculating partitions for large BIGINT ids

Author: Evan Yu <ehotou@gmail.com>

Closes #4701 from hotou/SPARK-5860 and squashes the following commits:

9e038d1 [Evan Yu] [SPARK-5860][CORE] Prevent overflowing at the length level
7883ad9 [Evan Yu] [SPARK-5860][CORE] Prevent overflowing at the length level
c88755a [Evan Yu] [SPARK-5860][CORE] switch to BigInt instead of BigDecimal
4e9ff4f [Evan Yu] [SPARK-5860][CORE] JdbcRDD overflow on large range with high number of partitions


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7683982f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7683982f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7683982f

Branch: refs/heads/master
Commit: 7683982faf920b8ac6cf46b79842450e7d46c5cc
Parents: 7138816
Author: Evan Yu <ehotou@gmail.com>
Authored: Sat Feb 21 20:40:21 2015 +0000
Committer: Sean Owen <sowen@cloudera.com>
Committed: Sat Feb 21 20:40:21 2015 +0000

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/JdbcRDD.scala    |  8 +--
 .../org/apache/spark/rdd/JdbcRDDSuite.scala     | 60 +++++++++++++++-----
 2 files changed, 50 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7683982f/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
index 4fe7622..e226786 100644
--- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
@@ -62,11 +62,11 @@ class JdbcRDD[T: ClassTag](
 
   override def getPartitions: Array[Partition] = {
     // bounds are inclusive, hence the + 1 here and - 1 on end
-    val length = 1 + upperBound - lowerBound
+    val length = BigInt(1) + upperBound - lowerBound
     (0 until numPartitions).map(i => {
-      val start = lowerBound + ((i * length) / numPartitions).toLong
-      val end = lowerBound + (((i + 1) * length) / numPartitions).toLong - 1
-      new JdbcPartition(i, start, end)
+      val start = lowerBound + ((i * length) / numPartitions)
+      val end = lowerBound + (((i + 1) * length) / numPartitions) - 1
+      new JdbcPartition(i, start.toLong, end.toLong)
     }).toArray
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7683982f/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
index 6138d0b..0dc5988 100644
--- a/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
@@ -29,22 +29,42 @@ class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext
{
     Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
     val conn = DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb;create=true")
     try {
-      val create = conn.createStatement
-      create.execute("""
-        CREATE TABLE FOO(
-          ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),
-          DATA INTEGER
-        )""")
-      create.close()
-      val insert = conn.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)")
-      (1 to 100).foreach { i =>
-        insert.setInt(1, i * 2)
-        insert.executeUpdate
+
+      try {
+        val create = conn.createStatement
+        create.execute("""
+          CREATE TABLE FOO(
+            ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY
1),
+            DATA INTEGER
+          )""")
+        create.close()
+        val insert = conn.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)")
+        (1 to 100).foreach { i =>
+          insert.setInt(1, i * 2)
+          insert.executeUpdate
+        }
+        insert.close()
+      } catch {
+        case e: SQLException if e.getSQLState == "X0Y32" =>
+        // table exists
       }
-      insert.close()
-    } catch {
-      case e: SQLException if e.getSQLState == "X0Y32" =>
+
+      try {
+        val create = conn.createStatement
+        create.execute("CREATE TABLE BIGINT_TEST(ID BIGINT NOT NULL, DATA INTEGER)")
+        create.close()
+        val insert = conn.prepareStatement("INSERT INTO BIGINT_TEST VALUES(?,?)")
+        (1 to 100).foreach { i =>
+          insert.setLong(1, 100000000000000000L +  4000000000000000L * i)
+          insert.setInt(2, i)
+          insert.executeUpdate
+        }
+        insert.close()
+      } catch {
+        case e: SQLException if e.getSQLState == "X0Y32" =>
         // table exists
+      }
+
     } finally {
       conn.close()
     }
@@ -62,6 +82,18 @@ class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext
{
     assert(rdd.count === 100)
     assert(rdd.reduce(_+_) === 10100)
   }
+  
+  test("large id overflow") {
+    sc = new SparkContext("local", "test")
+    val rdd = new JdbcRDD(
+      sc,
+      () => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") },
+      "SELECT DATA FROM BIGINT_TEST WHERE ? <= ID AND ID <= ?",
+      1131544775L, 567279358897692673L, 20,
+      (r: ResultSet) => { r.getInt(1) } ).cache()
+    assert(rdd.count === 100)
+    assert(rdd.reduce(_+_) === 5050)
+  }
 
   after {
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message