kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danburk...@apache.org
Subject [4/4] kudu git commit: [spark] KUDU-1631 push down StringStartsWith filters
Date Fri, 06 Jan 2017 19:59:06 GMT
[spark] KUDU-1631 push down StringStartsWith filters

This adds support for pushing down StringStartsWith filters to
Kudu. It converts the prefix query into a range query.

Change-Id: Ieb50486fe3f3b44a241a554a953e4e2c81f17be4
Reviewed-on: http://gerrit.cloudera.org:8080/5461
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <danburkert@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e126bde8
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e126bde8
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e126bde8

Branch: refs/heads/master
Commit: e126bde8ddfc59f3be238c8a01b0d6392f4c0d19
Parents: 75806ea
Author: Will Berkeley <wdberkeley@gmail.com>
Authored: Fri Dec 9 20:20:09 2016 -0500
Committer: Dan Burkert <danburkert@apache.org>
Committed: Fri Jan 6 19:58:27 2017 +0000

----------------------------------------------------------------------
 .../apache/kudu/spark/kudu/DefaultSource.scala  | 26 +++++++++--
 .../kudu/spark/kudu/DefaultSourceTest.scala     | 49 +++++++++++++++++++-
 2 files changed, 70 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e126bde8/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 1201591..198d09f 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -20,12 +20,10 @@ package org.apache.kudu.spark.kudu
 import java.sql.Timestamp
 
 import scala.collection.JavaConverters._
-
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
-
 import org.apache.kudu.Type
 import org.apache.kudu.annotations.InterfaceStability
 import org.apache.kudu.client.KuduPredicate.ComparisonOp
@@ -171,12 +169,33 @@ with InsertableRelation {
         Array(comparisonPredicate(column, ComparisonOp.LESS_EQUAL, value))
       case In(column, values) =>
         Array(inListPredicate(column, values))
+      case StringStartsWith(column, prefix) =>
+        prefixInfimum(prefix) match {
+          case None => Array(comparisonPredicate(column, ComparisonOp.GREATER_EQUAL, prefix))
+          case Some(inf) =>
+            Array(comparisonPredicate(column, ComparisonOp.GREATER_EQUAL, prefix),
+                  comparisonPredicate(column, ComparisonOp.LESS, inf))
+        }
       case And(left, right) => filterToPredicate(left) ++ filterToPredicate(right)
       case _ => Array()
     }
   }
 
   /**
+    * Returns the smallest string s such that, if p is a prefix of t,
+    * then t < s, if one exists.
+    *
+    * @param p the prefix
+    * @return Some(the prefix infimum), or None if none exists.
+    */
+  private def prefixInfimum(p: String): Option[String] = {
+    p.reverse.dropWhile(_ == Char.MaxValue).reverse match {
+      case "" => None
+      case q => Some(q.slice(0, q.length - 1) + (q(q.length - 1) + 1).toChar)
+    }
+  }
+
+  /**
     * Creates a new comparison predicate for the column, comparison operator, and comparison
value.
     *
     * @param column the column name
@@ -261,7 +280,8 @@ private[spark] object KuduRelation {
        | GreaterThanOrEqual(_, _)
        | LessThan(_, _)
        | LessThanOrEqual(_, _)
-       | In(_, _) => true
+       | In(_, _)
+       | StringStartsWith(_, _) => true
     case And(left, right) => supportsFilter(left) && supportsFilter(right)
     case _ => false
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/e126bde8/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 894a5d5..5ba1138 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 import scala.collection.immutable.IndexedSeq
 import scala.util.control.NonFatal
 
+import com.google.common.collect.ImmutableList
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.functions._
 import org.junit.Assert._
@@ -31,6 +32,8 @@ import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
+import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
+import org.apache.kudu.{Schema, Type}
 import org.apache.kudu.client.CreateTableOptions;
 
 @RunWith(classOf[JUnitRunner])
@@ -330,6 +333,47 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter
{
     assert(results.get(0).getString(1).equals("2"))
   }
 
+  test("Test SparkSQL StringStartsWith filters") {
+    // This test requires a special table.
+    val testTableName = "startswith"
+    val schema = new Schema(ImmutableList.of(
+      new ColumnSchemaBuilder("key", Type.STRING).key(true).build()))
+    val tableOptions = new CreateTableOptions().setRangePartitionColumns(List("key").asJava)
+      .setNumReplicas(1)
+    val testTable = kuduClient.createTable(testTableName, schema, tableOptions)
+
+    val kuduSession = kuduClient.newSession()
+    val chars = List('a', 'b', 'δΉ•', Char.MaxValue, '\0')
+    val keys = for (x <- chars; y <- chars; z <- chars; w <- chars) yield Array(x,
y, z, w).mkString
+    keys.foreach { key =>
+      val insert = testTable.newInsert
+      val row = insert.getRow
+      val r = Array(1, 2, 3)
+      row.addString(0, key)
+      kuduSession.apply(insert)
+    }
+    val options: Map[String, String] = Map(
+      "kudu.table" -> testTableName,
+      "kudu.master" -> miniCluster.getMasterAddresses)
+    sqlContext.read.options(options).kudu.registerTempTable(testTableName)
+
+    val checkPrefixCount = { prefix: String =>
+      val results = sqlContext.sql(s"SELECT key FROM $testTableName WHERE key LIKE '$prefix%'")
+      assertEquals(keys.count(k => k.startsWith(prefix)), results.count())
+    }
+    // empty string
+    checkPrefixCount("")
+    // one character
+    for (x <- chars) {
+      checkPrefixCount(Array(x).mkString)
+    }
+    // all two character combos
+    for (x <- chars; y <- chars) {
+      checkPrefixCount(Array(x, y).mkString)
+    }
+  }
+
+
   test("Test SQL: insert into") {
     val insertTable = "insertintotest"
 
@@ -365,10 +409,11 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter
{
 
     try {
       sqlContext.sql(s"INSERT OVERWRITE TABLE $insertTable SELECT * FROM $tableName")
-      fail()
+      fail("insert overwrite should throw UnsupportedOperationException")
     } catch {
       case _: UnsupportedOperationException => // good
-      case NonFatal(_) => fail()
+      case NonFatal(_) =>
+        fail("insert overwrite should throw UnsupportedOperationException")
     }
   }
 


Mime
View raw message