kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danburk...@apache.org
Subject incubator-kudu git commit: Adding kudu datasource for spark
Date Tue, 26 Apr 2016 22:45:44 GMT
Repository: incubator-kudu
Updated Branches:
  refs/heads/master 342b7a704 -> 938f7feaf


Adding kudu datasource for spark

Change-Id: I0f91772f58e9eee9de45901866867e9a5014cfbe
Reviewed-on: http://gerrit.cloudera.org:8080/2848
Reviewed-by: Dan Burkert <dan@cloudera.com>
Tested-by: Dan Burkert <dan@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/938f7fea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/938f7fea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/938f7fea

Branch: refs/heads/master
Commit: 938f7feaf90d76bc7f4d2823c61808a9b140168f
Parents: 342b7a7
Author: cgeorge <chris.george@rms.com>
Authored: Mon Apr 11 11:46:13 2016 -0600
Committer: Dan Burkert <dan@cloudera.com>
Committed: Tue Apr 26 22:45:10 2016 +0000

----------------------------------------------------------------------
 docs/developing.adoc                            |  22 ++
 docs/release_notes.adoc                         |   4 +
 java/kudu-spark/pom.xml                         |  47 ++--
 .../scala/org/kududb/spark/DefaultSource.scala  | 154 ------------
 .../scala/org/kududb/spark/KuduContext.scala    |  91 -------
 .../org/kududb/spark/kudu/DefaultSource.scala   | 235 +++++++++++++++++++
 .../org/kududb/spark/kudu/KuduContext.scala     |  78 ++++++
 .../scala/org/kududb/spark/kudu/KuduRDD.scala   | 133 +++++++++++
 .../scala/org/kududb/spark/kudu/package.scala   |  30 +++
 .../src/test/resources/log4j.properties         |   4 +-
 .../org/kududb/spark/DefaultSourceTest.scala    |  44 ----
 .../org/kududb/spark/KuduContextTest.scala      |  35 ---
 .../scala/org/kududb/spark/TestContext.scala    |  89 -------
 .../kududb/spark/kudu/DefaultSourceTest.scala   | 171 ++++++++++++++
 .../org/kududb/spark/kudu/KuduContextTest.scala |  35 +++
 .../org/kududb/spark/kudu/TestContext.scala     | 115 +++++++++
 16 files changed, 847 insertions(+), 440 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/docs/developing.adoc
----------------------------------------------------------------------
diff --git a/docs/developing.adoc b/docs/developing.adoc
index d0fe73e..89ee517 100644
--- a/docs/developing.adoc
+++ b/docs/developing.adoc
@@ -96,6 +96,28 @@ example Maven pom.xml files.
 See link:kudu_impala_integration.html[Using Impala With Kudu] for guidance on installing
 and using Impala with Kudu, including several `impala-shell` examples.
 
+== Kudu integration with Spark
+
+Kudu integrates with spark through the spark data source api as of version 0.9
+Include the kudu-spark using the --jars
+[source]
+----
+spark-shell --jars /kudu-spark-0.9.0.jar
+----
+Then import kudu-spark and create a dataframe:
+[source]
+----
+// Import kudu datasource
+import org.kududb.spark.kudu._
+val kuduDataFrame =  sqlContext.read.options(Map("kudu.master"-> "your.kudu.master.here","kudu.table"-> "your.kudu.table.here")).kudu
+// Then query using spark api or register a temporary table and use spark sql
+scala> kuduDataFrame.select("id").filter("id">=5).show()
+// Register kuduDataFrame as a temporary table for spark-sql
+kuduDataFrame.registerTempTable("kudu_table")
+// Select from the dataframe
+sqlContext.sql("select id from kudu_table where id>=5").show()
+----
+
 == Integration with MapReduce, YARN, and Other Frameworks
 
 Kudu was designed to integrate with MapReduce, YARN, Spark, and other frameworks in

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/docs/release_notes.adoc
----------------------------------------------------------------------
diff --git a/docs/release_notes.adoc b/docs/release_notes.adoc
index 017cb6c..1f0a3d3 100644
--- a/docs/release_notes.adoc
+++ b/docs/release_notes.adoc
@@ -71,6 +71,10 @@ Hadoop storage technologies.
   for creating partition-aware scan descriptors. Can be used by clients and
   query engines to more easily execute parallel scans.
 
+- link:http://gerrit.cloudera.org:8080/#/c/2848/[Gerrit 2848] Added a kudu datasource for spark which uses the kudu client directly instead of
+  using mapreduce api. Includes predicate pushdowns for spark-sql and spark filters.
+  Parallel retrieval for multiple tablets and column projections. link:developing.html#_kudu_integration_with_spark[Kudu integration with Spark Example]
+
 [[rn_0.8.0]]
 === Release notes specific to 0.8.0
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/pom.xml
----------------------------------------------------------------------
diff --git a/java/kudu-spark/pom.xml b/java/kudu-spark/pom.xml
index 590b45d..cc0d7fe 100644
--- a/java/kudu-spark/pom.xml
+++ b/java/kudu-spark/pom.xml
@@ -24,7 +24,7 @@
     <name>Kudu Spark Bindings</name>
 
     <properties>
-        <spark.version>1.3.0</spark.version>
+        <spark.version>1.6.1</spark.version>
         <scala.version>2.10.4</scala.version>
         <scala.binary.version>2.10</scala.binary.version>
         <top.dir>${project.basedir}/..</top.dir>
@@ -55,19 +55,6 @@
             <version>${spark.version}</version>
             <scope>provided</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-            <version>${spark.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-            <version>${spark.version}</version>
-            <type>test-jar</type>
-            <classifier>tests</classifier>
-            <scope>test</scope>
-        </dependency>
 
         <dependency>
             <groupId>org.kududb</groupId>
@@ -88,17 +75,6 @@
         </dependency>
 
         <dependency>
-            <groupId>org.kududb</groupId>
-            <artifactId>kudu-mapreduce</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-
-        <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
             <version>${scala.version}</version>
@@ -164,6 +140,27 @@
               <groupId>org.apache.maven.plugins</groupId>
               <artifactId>maven-surefire-plugin</artifactId>
             </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>2.3</version>
+                <configuration>
+                    <artifactSet>
+                        <includes>
+                            <include>*:*</include>
+                        </includes>
+                    </artifactSet>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/main/scala/org/kududb/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/kududb/spark/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/kududb/spark/DefaultSource.scala
deleted file mode 100644
index 447b059..0000000
--- a/java/kudu-spark/src/main/scala/org/kududb/spark/DefaultSource.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.kududb.spark
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.{Row, SQLContext}
-import org.kududb.Type
-import org.kududb.annotations.InterfaceStability
-import org.kududb.client.RowResult
-
-import scala.collection.JavaConverters._
-import scala.collection.immutable.HashMap
-
-/**
-  * DefaultSource for integration with Spark's dataframe datasources.
-  * This class will produce a relationProvider based on input given to it from spark.
-  */
-@InterfaceStability.Unstable
-class DefaultSource extends RelationProvider {
-
-  val TABLE_KEY = "kudu.table"
-  val KUDU_MASTER = "kudu.master"
-
-  /**
-    * Construct a BaseRelation using the provided context and parameters.
-    *
-    * @param sqlContext SparkSQL context
-    * @param parameters parameters given to us from SparkSQL
-    * @return           a BaseRelation Object
-    */
-  override def createRelation(sqlContext: SQLContext,
-                              parameters: Map[String, String]):
-  BaseRelation = {
-    val tableName = parameters.get(TABLE_KEY)
-    if (tableName.isEmpty) {
-      throw new IllegalArgumentException(s"Invalid value for $TABLE_KEY '$tableName'")
-    }
-
-    val kuduMaster = parameters.getOrElse(KUDU_MASTER, "localhost")
-
-    new KuduRelation(tableName.get, kuduMaster)(sqlContext)
-  }
-}
-
-/**
-  * Implementation of Spark BaseRelation.
-  *
-  * @param tableName Kudu table that we plan to read from
-  * @param kuduMaster Kudu master addresses
-  * @param sqlContext SparkSQL context
-  */
-@InterfaceStability.Unstable
-class KuduRelation(val tableName: String,
-                   val kuduMaster: String)(
-                   @transient val sqlContext: SQLContext)
-  extends BaseRelation with PrunedFilteredScan with Serializable {
-
-  val typesMapping = HashMap[Type, DataType](
-    Type.INT16 -> IntegerType,
-    Type.INT32 -> IntegerType,
-    Type.INT64 -> LongType,
-    Type.FLOAT -> FloatType,
-    Type.DOUBLE -> DoubleType,
-    Type.STRING -> StringType,
-    Type.TIMESTAMP -> TimestampType,
-    Type.BINARY -> BinaryType
-  )
-
-  // Using lazy val for the following because we can't serialize them but we need them once we
-  // deserialize them.
-  @transient lazy val kuduContext = new KuduContext(kuduMaster)
-  @transient lazy val kuduTable = kuduContext.syncClient.openTable(tableName)
-  @transient lazy val tableColumns = kuduTable.getSchema.getColumns.asScala
-  @transient lazy val kuduSchemaColumnMap = tableColumns.map(c => (c.getName, c)).toMap
-
-  /**
-    * Generates a SparkSQL schema object so SparkSQL knows what is being
-    * provided by this BaseRelation.
-    *
-    * @return schema generated from the Kudu table's schema
-    */
-  override def schema: StructType = {
-    val metadataBuilder = new MetadataBuilder()
-
-    val structFieldArray: Array[StructField] =
-      tableColumns.map { columnSchema =>
-        val columnSparkSqlType = typesMapping.getOrElse(
-          columnSchema.getType,
-          throw new IllegalArgumentException(s"Unsupported column type: ${columnSchema.getType}"))
-
-        val metadata = metadataBuilder.putString("name", columnSchema.getName).build()
-        new StructField(columnSchema.getName, columnSparkSqlType,
-                        nullable = columnSchema.isNullable, metadata)
-      }.toArray
-
-    new StructType(structFieldArray)
-  }
-
-  /**
-    * Build the RDD to scan rows.
-    *
-    * @param requiredColumns columns that are being requested by the requesting query
-    * @param filters         filters that are being applied by the requesting query
-    * @return                RDD will all the results from Kudu
-    */
-  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
-    kuduContext.kuduRDD(sqlContext.sparkContext, tableName, requiredColumns).map { row =>
-      // TODO use indexes instead of column names since it requires one less mapping.
-      Row.fromSeq(requiredColumns.map(column => getKuduValue(row, column)))
-    }
-  }
-
-  private def getKuduValue(row: RowResult, columnName: String): Any = {
-    val columnSchema = kuduSchemaColumnMap.getOrElse(columnName,
-      throw new IllegalArgumentException(s"Couldn't find column '$columnName'"))
-
-    if (columnSchema.isNullable && row.isNull(columnName)) {
-      return null
-    }
-
-    val columnType = columnSchema.getType
-
-    columnType match {
-      case Type.BINARY => row.getBinary(columnName)
-      case Type.BOOL => row.getBoolean(columnName)
-      case Type.DOUBLE => row.getDouble(columnName)
-      case Type.FLOAT => row.getFloat(columnName)
-      case Type.INT16 => row.getShort(columnName)
-      case Type.INT32 => row.getInt(columnName)
-      case Type.INT64 => row.getLong(columnName)
-      case Type.INT8 => row.getByte(columnName)
-      case Type.TIMESTAMP => row.getLong(columnName)
-      case Type.STRING => row.getString(columnName)
-      case _ => throw new IllegalArgumentException(s"Type not supported: '${columnType.getName}'")
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/main/scala/org/kududb/spark/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/kududb/spark/KuduContext.scala b/java/kudu-spark/src/main/scala/org/kududb/spark/KuduContext.scala
deleted file mode 100644
index c7bfe3e..0000000
--- a/java/kudu-spark/src/main/scala/org/kududb/spark/KuduContext.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.kududb.spark
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.util.ShutdownHookManager
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import org.kududb.annotations.InterfaceStability
-import org.kududb.client.{AsyncKuduClient, KuduClient, RowResult}
-import org.kududb.mapreduce.KuduTableInputFormat
-
-/**
-  * KuduContext is a fa├žade for Kudu operations.
-  *
-  * If a Kudu client connection is needed as part of a Spark application, a
-  * [[KuduContext]] should used as a broadcast variable in the job in order to
-  * share connections among the tasks in a JVM.
-  */
-@InterfaceStability.Unstable
-class KuduContext(kuduMaster: String) extends Serializable {
-
-  /**
-    * Set to
-    * [[org.apache.spark.util.ShutdownHookManager.DEFAULT_SHUTDOWN_PRIORITY]].
-    * The client instances are closed through the JVM shutdown hook
-    * mechanism in order to make sure that any unflushed writes are cleaned up
-    * properly. Spark has no way of notifying the [[DefaultSource]] on shutdown.
-    */
-  private val ShutdownHookPriority = 100
-
-  @transient lazy val syncClient = {
-    val syncClient = new KuduClient.KuduClientBuilder(kuduMaster).build()
-    ShutdownHookManager.get().addShutdownHook(new Runnable {
-      override def run() = syncClient.close()
-    }, ShutdownHookPriority)
-    syncClient
-  }
-  @transient lazy val asyncClient = {
-    val asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(kuduMaster).build()
-    ShutdownHookManager.get().addShutdownHook(
-      new Runnable {
-        override def run() = asyncClient.close()
-      }, ShutdownHookPriority)
-    asyncClient
-  }
-
-  /**
-    * Create an RDD from a Kudu table.
-    *
-    * @param tableName          table to read from
-    * @param columnProjection   list of columns to read
-    *
-    * Not specifying this at all (i.e. setting to null) or setting to the special string
-    * '*' means to project all columns.
-    * @return a new RDD that maps over the given table for the selected columns
-    */
-  def kuduRDD(sc: SparkContext,
-              tableName: String,
-              columnProjection: Seq[String] = Nil): RDD[RowResult] = {
-
-    val conf = new Configuration
-    conf.set("kudu.mapreduce.master.address", kuduMaster)
-    conf.set("kudu.mapreduce.input.table", tableName)
-    if (columnProjection.nonEmpty) {
-      conf.set("kudu.mapreduce.column.projection", columnProjection.mkString(","))
-    }
-
-    val rdd = sc.newAPIHadoopRDD(conf, classOf[KuduTableInputFormat],
-                                 classOf[NullWritable], classOf[RowResult])
-
-    val columnNames = if (columnProjection.nonEmpty) columnProjection.mkString(", ") else "(*)"
-    rdd.values.setName(s"KuduRDD { table=$tableName, columnProjection=$columnNames }")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala
new file mode 100644
index 0000000..6226862
--- /dev/null
+++ b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kududb.spark.kudu
+
+import java.sql.Timestamp
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{Row, SQLContext}
+import org.kududb.Type
+import org.kududb.annotations.InterfaceStability
+import org.kududb.client.{KuduPredicate, KuduTable}
+import org.kududb.client.KuduPredicate.ComparisonOp
+
+import scala.collection.JavaConverters._
+
+/**
+  * DefaultSource for integration with Spark's dataframe datasources.
+  * This class will produce a relationProvider based on input given to it from spark.
+  */
+@InterfaceStability.Unstable
+class DefaultSource extends RelationProvider {
+
+  val TABLE_KEY = "kudu.table"
+  val KUDU_MASTER = "kudu.master"
+
+  /**
+    * Construct a BaseRelation using the provided context and parameters.
+    *
+    * @param sqlContext SparkSQL context
+    * @param parameters parameters given to us from SparkSQL
+    * @return           a BaseRelation Object
+    */
+  override def createRelation(sqlContext: SQLContext,
+                              parameters: Map[String, String]):
+  BaseRelation = {
+    val tableName = parameters.get(TABLE_KEY)
+    if (tableName.isEmpty) {
+      throw new IllegalArgumentException(s"Invalid value for $TABLE_KEY '$tableName'")
+    }
+
+    val kuduMaster = parameters.getOrElse(KUDU_MASTER, "localhost")
+
+    new KuduRelation(tableName.get, kuduMaster)(sqlContext)
+  }
+}
+
+/**
+  * Implementation of Spark BaseRelation.
+  *
+  * @param tableName Kudu table that we plan to read from
+  * @param kuduMaster Kudu master addresses
+  * @param sqlContext SparkSQL context
+  */
+@InterfaceStability.Unstable
+class KuduRelation(private val tableName: String,
+                   private val kuduMaster: String)(
+                   val sqlContext: SQLContext)
+extends BaseRelation
+with PrunedFilteredScan {
+  import KuduRelation._
+
+  private val context: KuduContext = new KuduContext(kuduMaster)
+  private val table: KuduTable = context.syncClient.openTable(tableName)
+
+  override def unhandledFilters(filters: Array[Filter]): Array[Filter] =
+    filters.filterNot(supportsFilter)
+
+  /**
+    * Generates a SparkSQL schema object so SparkSQL knows what is being
+    * provided by this BaseRelation.
+    *
+    * @return schema generated from the Kudu table's schema
+    */
+  override def schema: StructType = {
+    val fields: Array[StructField] =
+      table.getSchema.getColumns.asScala.map { columnSchema =>
+        val sparkType = kuduTypeToSparkType(columnSchema.getType)
+        new StructField(columnSchema.getName, sparkType, columnSchema.isNullable)
+      }.toArray
+
+    new StructType(fields)
+  }
+
+  /**
+    * Build the RDD to scan rows.
+    *
+    * @param requiredColumns columns that are being requested by the requesting query
+    * @param filters         filters that are being applied by the requesting query
+    * @return                RDD will all the results from Kudu
+    */
+  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+    val predicates = filters.flatMap(filterToPredicate)
+    new KuduRDD(kuduMaster, 1024*1024*20, requiredColumns, predicates,
+                table, context, sqlContext.sparkContext)
+  }
+
+  /**
+    * Converts a Spark [[Filter]] to a Kudu [[KuduPredicate]].
+    *
+    * @param filter the filter to convert
+    * @return the converted filter
+    */
+  private def filterToPredicate(filter : Filter) : Array[KuduPredicate] = {
+    filter match {
+      case EqualTo(column, value) =>
+        Array(comparisonPredicate(column, ComparisonOp.EQUAL, value))
+      case GreaterThan(column, value) =>
+        Array(comparisonPredicate(column, ComparisonOp.GREATER, value))
+      case GreaterThanOrEqual(column, value) =>
+        Array(comparisonPredicate(column, ComparisonOp.GREATER_EQUAL, value))
+      case LessThan(column, value) =>
+        Array(comparisonPredicate(column, ComparisonOp.LESS, value))
+      case LessThanOrEqual(column, value) =>
+        Array(comparisonPredicate(column, ComparisonOp.LESS_EQUAL, value))
+      case And(left, right) => filterToPredicate(left) ++ filterToPredicate(right)
+      case _ => Array()
+    }
+  }
+
+  /**
+    * Creates a new comparison predicate for the column, comparison operator, and comparison value.
+    *
+    * @param column the column name
+    * @param operator the comparison operator
+    * @param value the comparison value
+    * @return the comparison predicate
+    */
+  private def comparisonPredicate(column: String,
+                                  operator: ComparisonOp,
+                                  value: Any): KuduPredicate = {
+    val columnSchema = table.getSchema.getColumn(column)
+    value match {
+      case value: Boolean => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+      case value: Byte => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+      case value: Short => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+      case value: Int => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+      case value: Long => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+      case value: Timestamp => KuduPredicate.newComparisonPredicate(columnSchema, operator, timestampToMicros(value))
+      case value: Float => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+      case value: Double => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+      case value: String => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+      case value: Array[Byte] => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+    }
+  }
+}
+
+private[spark] object KuduRelation {
+  /**
+    * Converts a Kudu [[Type]] to a Spark SQL [[DataType]].
+    *
+    * @param t the Kudu type
+    * @return the corresponding Spark SQL type
+    */
+  private def kuduTypeToSparkType(t: Type): DataType = t match {
+    case Type.BOOL => BooleanType
+    case Type.INT8 => ByteType
+    case Type.INT16 => ShortType
+    case Type.INT32 => IntegerType
+    case Type.INT64 => LongType
+    case Type.TIMESTAMP => TimestampType
+    case Type.FLOAT => FloatType
+    case Type.DOUBLE => DoubleType
+    case Type.STRING => StringType
+    case Type.BINARY => BinaryType
+  }
+
+  /**
+    * Returns `true` if the filter is able to be pushed down to Kudu.
+    *
+    * @param filter the filter to test
+    */
+  private def supportsFilter(filter: Filter): Boolean = filter match {
+    case EqualTo(_, _)
+       | GreaterThan(_, _)
+       | GreaterThanOrEqual(_, _)
+       | LessThan(_, _)
+       | LessThanOrEqual(_, _) => true
+    case And(left, right) => supportsFilter(left) && supportsFilter(right)
+    case _ => false
+  }
+
+  /**
+    * Converts a [[Timestamp]] to microseconds since the Unix epoch (1970-01-01T00:00:00Z).
+    *
+    * @param timestamp the timestamp to convert to microseconds
+    * @return the microseconds since the Unix epoch
+    */
+  def timestampToMicros(timestamp: Timestamp): Long = {
+    // Number of whole milliseconds since the Unix epoch, in microseconds.
+    val millis = timestamp.getTime * 1000
+    // Sub millisecond time since the Unix epoch, in microseconds.
+    val micros = (timestamp.getNanos % 1000000) / 1000
+    if (micros >= 0) {
+      millis + micros
+    } else {
+      millis + 1000000 + micros
+    }
+  }
+
+  /**
+    * Converts a microsecond offset from the Unix epoch (1970-01-01T00:00:00Z) to a [[Timestamp]].
+    *
+    * @param micros the offset in microseconds since the Unix epoch
+    * @return the corresponding timestamp
+    */
+  def microsToTimestamp(micros: Long): Timestamp = {
+    var millis = micros / 1000
+    var nanos = (micros % 1000000) * 1000
+    if (nanos < 0) {
+      millis -= 1
+      nanos += 1000000000
+    }
+
+    val timestamp = new Timestamp(millis)
+    timestamp.setNanos(nanos.asInstanceOf[Int])
+    timestamp
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala
new file mode 100644
index 0000000..47984b2
--- /dev/null
+++ b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kududb.spark.kudu
+
+import org.apache.hadoop.util.ShutdownHookManager
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.kududb.annotations.InterfaceStability
+import org.kududb.client.{AsyncKuduClient, KuduClient}
+
+/**
+  * KuduContext is a serializable container for Kudu client connections.
+  *
+  * If a Kudu client connection is needed as part of a Spark application, a
+  * [[KuduContext]] should used as a broadcast variable in the job in order to
+  * share connections among the tasks in a JVM.
+  */
+@InterfaceStability.Unstable
+class KuduContext(kuduMaster: String) extends Serializable {
+
+  /**
+    * Set to
+    * [[org.apache.spark.util.ShutdownHookManager.DEFAULT_SHUTDOWN_PRIORITY]].
+    * The client instances are closed through the JVM shutdown hook
+    * mechanism in order to make sure that any unflushed writes are cleaned up
+    * properly. Spark has no shutdown notifications.
+    */
+  private val ShutdownHookPriority = 100
+
+  @transient lazy val syncClient = {
+    val syncClient = new KuduClient.KuduClientBuilder(kuduMaster).build()
+    ShutdownHookManager.get().addShutdownHook(new Runnable {
+      override def run() = syncClient.close()
+    }, ShutdownHookPriority)
+    syncClient
+  }
+
+  @transient lazy val asyncClient = {
+    val asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(kuduMaster).build()
+    ShutdownHookManager.get().addShutdownHook(
+      new Runnable {
+        override def run() = asyncClient.close()
+      }, ShutdownHookPriority)
+    asyncClient
+  }
+
+  /**
+    * Create an RDD from a Kudu table.
+    *
+    * @param tableName          table to read from
+    * @param columnProjection   list of columns to read. Not specifying this at all
+    *                           (i.e. setting to null) or setting to the special
+    *                           string '*' means to project all columns.
+    * @return a new RDD that maps over the given table for the selected columns
+    */
+  def kuduRDD(sc: SparkContext,
+              tableName: String,
+              columnProjection: Seq[String] = Nil): RDD[Row] = {
+    new KuduRDD(kuduMaster, 1024*1024*20, columnProjection.toArray, Array(),
+                syncClient.openTable(tableName), this, sc)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduRDD.scala
new file mode 100644
index 0000000..5395d5a
--- /dev/null
+++ b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduRDD.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.kududb.spark.kudu
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.kududb.client._
+import org.kududb.{Type, client}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A Resilient Distributed Dataset backed by a Kudu table.
+  */
+class KuduRDD(val kuduMaster: String,
+              @transient batchSize: Integer,
+              @transient projectedCols: Array[String],
+              @transient predicates: Array[client.KuduPredicate],
+              @transient table: KuduTable,
+              @transient kc: KuduContext,
+              @transient sc: SparkContext) extends RDD[Row](sc, Nil) {
+
+  /**
+    * The [[KuduContext]] for this `KuduRDD`.
+    *
+    * The `KuduContext` manages the Kudu client instances for the `KuduRDD`.
+    * When the `KuduRDD` is first constructed it uses the context passed in as
+    * `kc`. After deserialization, a new `KuduContext` is created as necessary.
+    * The `kc` field should not be used, since it will not be rehydrated after
+    * serialization.
+    */
+  @transient private lazy val kuduContext: KuduContext = {
+    if (kc != null) kc else new KuduContext(kuduMaster)
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    val builder = kuduContext.syncClient
+                         .newScanTokenBuilder(table)
+                         .batchSizeBytes(batchSize)
+                         .setProjectedColumnNames(projectedCols.toSeq.asJava)
+                         .cacheBlocks(true)
+
+    for (predicate <- predicates) {
+      builder.addPredicate(predicate)
+    }
+    val tokens = builder.build().asScala
+    tokens.zipWithIndex.map {
+      case (token, index) =>
+        new KuduPartition(index, token.serialize(),
+                          token.getTablet.getReplicas.asScala.map(_.getRpcHost).toArray)
+    }.toArray
+  }
+
+  override def compute(part: Partition, taskContext: TaskContext): Iterator[Row] = {
+    val client: KuduClient = kuduContext.syncClient
+    val partition: KuduPartition = part.asInstanceOf[KuduPartition]
+    val scanner = KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
+    new RowResultIteratorScala(scanner)
+  }
+
+  override def getPreferredLocations(partition: Partition): Seq[String] = {
+    partition.asInstanceOf[KuduPartition].locations
+  }
+}
+
+/**
+  * A Spark SQL [[Partition]] which wraps a [[KuduScanToken]].
+  */
+private[spark] class KuduPartition(val index: Int,
+                                   val scanToken: Array[Byte],
+                                   val locations : Array[String]) extends Partition {}
+
+/**
+  * A Spark SQL [[Row]] iterator which wraps a [[KuduScanner]].
+  * @param scanner the wrapped scanner
+  */
+private[spark] class RowResultIteratorScala(private val scanner: KuduScanner) extends Iterator[Row] {
+
+  private var currentIterator: RowResultIterator = null
+
+  override def hasNext: Boolean = {
+    if ((currentIterator != null && !currentIterator.hasNext && scanner.hasMoreRows) ||
+      (scanner.hasMoreRows && currentIterator == null)) {
+      currentIterator = scanner.nextRows()
+    }
+    currentIterator.hasNext
+  }
+
+  override def next(): Row = new KuduRow(currentIterator.next())
+}
+
+/**
+  * A Spark SQL [[Row]] which wraps a Kudu [[RowResult]].
+  * @param rowResult the wrapped row result
+  */
+private[spark] class KuduRow(private val rowResult: RowResult) extends Row {
+  override def length: Int = rowResult.getColumnProjection.getColumnCount
+
+  override def get(i: Int): Any = {
+    if (rowResult.isNull(i)) null
+    else rowResult.getColumnType(i) match {
+      case Type.BOOL => rowResult.getBoolean(i)
+      case Type.INT8 => rowResult.getByte(i)
+      case Type.INT16 => rowResult.getShort(i)
+      case Type.INT32 => rowResult.getInt(i)
+      case Type.INT64 => rowResult.getLong(i)
+      case Type.TIMESTAMP => KuduRelation.microsToTimestamp(rowResult.getLong(i))
+      case Type.FLOAT => rowResult.getFloat(i)
+      case Type.DOUBLE => rowResult.getDouble(i)
+      case Type.STRING => rowResult.getString(i)
+      case Type.BINARY => rowResult.getBinary(i)
+    }
+  }
+
+  override def copy(): Row = Row.fromSeq(Range(0, length).map(get))
+
+  override def toString(): String = rowResult.toString
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala
new file mode 100755
index 0000000..29ba455
--- /dev/null
+++ b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.kududb.spark
+
+import org.apache.spark.sql.{DataFrame, DataFrameReader}
+
+package object kudu {
+
+  /**
+   * Adds a method, `kudu`, to DataFrameReader that allows you to read Kudu tables using
+   * the DataFrameReader.
+   */
+  implicit class KuduDataFrameReader(reader: DataFrameReader) {
+    def kudu: DataFrame = reader.format("org.kududb.spark.kudu").load
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/resources/log4j.properties b/java/kudu-spark/src/test/resources/log4j.properties
index cb277ed..94321ff 100644
--- a/java/kudu-spark/src/test/resources/log4j.properties
+++ b/java/kudu-spark/src/test/resources/log4j.properties
@@ -15,9 +15,9 @@
 # specific language governing permissions and limitations
 # under the License.
 
-log4j.rootLogger = DEBUG, out
+log4j.rootLogger = WARN, out
 log4j.appender.out = org.apache.log4j.ConsoleAppender
 log4j.appender.out.layout = org.apache.log4j.PatternLayout
 log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
 
-log4j.logger.org.kududb = DEBUG
\ No newline at end of file
+log4j.logger.org.kududb = INFO
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/test/scala/org/kududb/spark/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/kududb/spark/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/kududb/spark/DefaultSourceTest.scala
deleted file mode 100644
index 483b2e2..0000000
--- a/java/kudu-spark/src/test/scala/org/kududb/spark/DefaultSourceTest.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.kududb.spark
-
-import org.apache.spark.sql.SQLContext
-import org.junit.runner.RunWith
-import org.scalatest.FunSuite
-import org.scalatest.junit.JUnitRunner
-
-@RunWith(classOf[JUnitRunner])
-class DefaultSourceTest extends FunSuite with TestContext {
-
-  test("Test basic SparkSQL") {
-    val rowCount = 10
-
-    insertRows(rowCount)
-
-    val sqlContext = new SQLContext(sc)
-
-    sqlContext.load("org.kududb.spark",
-      Map("kudu.table" -> tableName, "kudu.master" -> miniCluster.getMasterAddresses))
-      .registerTempTable(tableName)
-
-    val results = sqlContext.sql("SELECT * FROM " + tableName).collectAsList()
-    assert(results.size() == rowCount)
-
-    assert(results.get(0).isNullAt(2))
-    assert(!results.get(1).isNullAt(2))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/test/scala/org/kududb/spark/KuduContextTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/kududb/spark/KuduContextTest.scala b/java/kudu-spark/src/test/scala/org/kududb/spark/KuduContextTest.scala
deleted file mode 100644
index 67aad7b..0000000
--- a/java/kudu-spark/src/test/scala/org/kududb/spark/KuduContextTest.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.kududb.spark
-
-import org.junit.runner.RunWith
-import org.scalatest.FunSuite
-import org.scalatest.junit.JUnitRunner
-
-@RunWith(classOf[JUnitRunner])
-class KuduContextTest extends FunSuite with TestContext {
-  test("Test basic kuduRDD") {
-    val rowCount = 10
-
-    insertRows(rowCount)
-
-    val scanRdd = kuduContext.kuduRDD(sc, "test")
-
-    val scanList = scanRdd.map(r => r.getInt(0)).collect()
-    assert(scanList.length == rowCount)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/test/scala/org/kududb/spark/TestContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/kududb/spark/TestContext.scala b/java/kudu-spark/src/test/scala/org/kududb/spark/TestContext.scala
deleted file mode 100644
index 9876282..0000000
--- a/java/kudu-spark/src/test/scala/org/kududb/spark/TestContext.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.kududb.spark
-
-import com.google.common.collect.ImmutableList
-import org.apache.spark.SparkContext
-import org.kududb.ColumnSchema.ColumnSchemaBuilder
-import org.kududb.client.KuduClient.KuduClientBuilder
-import org.kududb.client.MiniKuduCluster.MiniKuduClusterBuilder
-import org.kududb.client.{CreateTableOptions, KuduClient, KuduTable, MiniKuduCluster}
-import org.kududb.{Schema, Type}
-import org.scalatest.{BeforeAndAfterAll, Suite}
-
-trait TestContext extends BeforeAndAfterAll { self: Suite =>
-
-  var sc: SparkContext = null
-  var miniCluster: MiniKuduCluster = null
-  var kuduClient: KuduClient = null
-  var table: KuduTable = null
-  var kuduContext: KuduContext = null
-
-  val tableName = "test"
-
-  lazy val schema: Schema = {
-    val columns = ImmutableList.of(
-      new ColumnSchemaBuilder("key", Type.INT32).key(true).build(),
-      new ColumnSchemaBuilder("c1_i", Type.INT32).build(),
-      new ColumnSchemaBuilder("c2_s", Type.STRING).nullable(true).build())
-    new Schema(columns)
-  }
-
-  override def beforeAll() {
-    miniCluster = new MiniKuduClusterBuilder()
-      .numMasters(1)
-      .numTservers(1)
-      .build()
-    val envMap = Map[String,String](("Xmx", "512m"))
-
-    sc = new SparkContext("local[2]", "test", null, Nil, envMap)
-
-    kuduClient = new KuduClientBuilder(miniCluster.getMasterAddresses).build()
-    assert(miniCluster.waitForTabletServers(1))
-
-    kuduContext = new KuduContext(miniCluster.getMasterAddresses)
-
-    val tableOptions = new CreateTableOptions().setNumReplicas(1)
-    table = kuduClient.createTable(tableName, schema, tableOptions)
-  }
-
-  override def afterAll() {
-    if (kuduClient != null) kuduClient.shutdown()
-    if (miniCluster != null) miniCluster.shutdown()
-    if (sc != null) sc.stop()
-  }
-
-  def insertRows(rowCount: Integer) {
-    val kuduSession = kuduClient.newSession()
-
-    for (i <- 1 to rowCount) {
-      val insert = table.newInsert
-      val row = insert.getRow
-      row.addInt(0, i)
-      row.addInt(1, i)
-
-      // Sprinkling some nulls so that queries see them.
-      if (i % 2 == 0) {
-        row.addString(2, i.toString)
-      } else {
-        row.setNull(2)
-      }
-
-      kuduSession.apply(insert)
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala
new file mode 100644
index 0000000..7161ace
--- /dev/null
+++ b/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.kududb.spark.kudu
+
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.TimeZone
+
+import org.apache.spark.sql.SQLContext
+import org.junit.Assert._
+import org.junit.runner.RunWith
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.junit.JUnitRunner
+
+import scala.collection.immutable.IndexedSeq
+
+@RunWith(classOf[JUnitRunner])
+class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter {
+
+  test("timestamp conversion") {
+    val epoch = new Timestamp(0)
+    assertEquals(0, KuduRelation.timestampToMicros(epoch))
+    assertEquals(epoch, KuduRelation.microsToTimestamp(0))
+
+    val t1 = new Timestamp(0)
+    t1.setNanos(123456000)
+    assertEquals(123456, KuduRelation.timestampToMicros(t1))
+    assertEquals(t1, KuduRelation.microsToTimestamp(123456))
+
+    val iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
+    iso8601.setTimeZone(TimeZone.getTimeZone("UTC"))
+
+    val t3 = new Timestamp(iso8601.parse("1923-12-01T00:44:36.876").getTime)
+    t3.setNanos(876544000)
+    assertEquals(-1454368523123456L, KuduRelation.timestampToMicros(t3))
+    assertEquals(t3, KuduRelation.microsToTimestamp(-1454368523123456L))
+  }
+
+  val rowCount = 10
+  var sqlContext : SQLContext = _
+  var rows : IndexedSeq[(Int, Int, String)] = _
+  before {
+    val rowCount = 10
+    rows = insertRows(rowCount)
+
+    sqlContext = new SQLContext(sc)
+
+    sqlContext.read.options(
+      Map("kudu.table" -> tableName, "kudu.master" -> miniCluster.getMasterAddresses)).kudu
+      .registerTempTable(tableName)
+  }
+
+  test("table scan") {
+    val results = sqlContext.sql(s"SELECT * FROM $tableName").collectAsList()
+    assert(results.size() == rowCount)
+
+    assert(!results.get(0).isNullAt(2))
+    assert(results.get(1).isNullAt(2))
+  }
+
+  test("table scan with projection") {
+    assertEquals(10, sqlContext.sql(s"""SELECT key FROM $tableName""").count())
+  }
+
+  test("table scan with projection and predicate double") {
+    assertEquals(rows.count { case (key, i, s) => i != null && i > 5 },
+                 sqlContext.sql(s"""SELECT key, c3_double FROM $tableName where c3_double > "5.0"""").count())
+  }
+
+  test("table scan with projection and predicate long") {
+    assertEquals(rows.count { case (key, i, s) => i != null && i > 5 },
+                 sqlContext.sql(s"""SELECT key, c4_long FROM $tableName where c4_long > "5"""").count())
+
+  }
+  test("table scan with projection and predicate bool") {
+    assertEquals(rows.count { case (key, i, s) => i != null && i%2==0 },
+                 sqlContext.sql(s"""SELECT key, c5_bool FROM $tableName where c5_bool = true""").count())
+
+  }
+  test("table scan with projection and predicate short") {
+    assertEquals(rows.count { case (key, i, s) => i != null && i > 5},
+                 sqlContext.sql(s"""SELECT key, c6_short FROM $tableName where c6_short > 5""").count())
+
+  }
+  test("table scan with projection and predicate float") {
+    assertEquals(rows.count { case (key, i, s) => i != null && i > 5},
+                 sqlContext.sql(s"""SELECT key, c7_float FROM $tableName where c7_float > 5""").count())
+
+  }
+
+  test("table scan with projection and predicate ") {
+    assertEquals(rows.count { case (key, i, s) => s != null && s > "5" },
+      sqlContext.sql(s"""SELECT key FROM $tableName where c2_s > "5"""").count())
+
+    assertEquals(rows.count { case (key, i, s) => s != null },
+      sqlContext.sql(s"""SELECT key, c2_s FROM $tableName where c2_s IS NOT NULL""").count())
+  }
+
+
+  test("Test basic SparkSQL") {
+    val results = sqlContext.sql("SELECT * FROM " + tableName).collectAsList()
+    assert(results.size() == rowCount)
+
+    assert(results.get(1).isNullAt(2))
+    assert(!results.get(0).isNullAt(2))
+  }
+
+  test("Test basic SparkSQL projection") {
+    val results = sqlContext.sql("SELECT key FROM " + tableName).collectAsList()
+    assert(results.size() == rowCount)
+    assert(results.get(0).size.equals(1))
+    assert(results.get(0).getInt(0).equals(0))
+  }
+
+  test("Test basic SparkSQL with predicate") {
+    val results = sqlContext.sql("SELECT key FROM " + tableName + " where key=1").collectAsList()
+    assert(results.size() == 1)
+    assert(results.get(0).size.equals(1))
+    assert(results.get(0).getInt(0).equals(1))
+
+  }
+
+  test("Test basic SparkSQL with two predicates") {
+    val results = sqlContext.sql("SELECT key FROM " + tableName + " where key=2 and c2_s='2'").collectAsList()
+    assert(results.size() == 1)
+    assert(results.get(0).size.equals(1))
+    assert(results.get(0).getInt(0).equals(2))
+  }
+
+  test("Test basic SparkSQL with two predicates negative") {
+    val results = sqlContext.sql("SELECT key FROM " + tableName + " where key=1 and c2_s='2'").collectAsList()
+    assert(results.size() == 0)
+  }
+
+  test("Test basic SparkSQL with two predicates including string") {
+    val results = sqlContext.sql("SELECT key FROM " + tableName + " where c2_s='2'").collectAsList()
+    assert(results.size() == 1)
+    assert(results.get(0).size.equals(1))
+    assert(results.get(0).getInt(0).equals(2))
+  }
+
+  test("Test basic SparkSQL with two predicates and projection") {
+    val results = sqlContext.sql("SELECT key, c2_s FROM " + tableName + " where c2_s='2'").collectAsList()
+    assert(results.size() == 1)
+    assert(results.get(0).size.equals(2))
+    assert(results.get(0).getInt(0).equals(2))
+    assert(results.get(0).getString(1).equals("2"))
+  }
+
+  test("Test basic SparkSQL with two predicates greater than") {
+    val results = sqlContext.sql("SELECT key, c2_s FROM " + tableName + " where c2_s>='2'").collectAsList()
+    assert(results.size() == 4)
+    assert(results.get(0).size.equals(2))
+    assert(results.get(0).getInt(0).equals(2))
+    assert(results.get(0).getString(1).equals("2"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/KuduContextTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/KuduContextTest.scala b/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/KuduContextTest.scala
new file mode 100644
index 0000000..fc2dff5
--- /dev/null
+++ b/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/KuduContextTest.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.kududb.spark.kudu
+
+import org.junit.runner.RunWith
+import org.scalatest.FunSuite
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class KuduContextTest extends FunSuite with TestContext {
+  test("Test basic kuduRDD") {
+    val rowCount = 10
+
+    insertRows(rowCount)
+
+    val scanRdd = kuduContext.kuduRDD(sc, "test", Seq("key"))
+
+    val scanList = scanRdd.map(r => r.getInt(0)).collect()
+    assert(scanList.length == rowCount)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/TestContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/TestContext.scala b/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/TestContext.scala
new file mode 100644
index 0000000..97a4d39
--- /dev/null
+++ b/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/TestContext.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.kududb.spark.kudu
+
+import java.util.Date
+
+import com.google.common.collect.ImmutableList
+import org.apache.spark.{SparkConf, SparkContext}
+import org.kududb.ColumnSchema.ColumnSchemaBuilder
+import org.kududb.client.KuduClient.KuduClientBuilder
+import org.kududb.client.MiniKuduCluster.MiniKuduClusterBuilder
+import org.kududb.client.{CreateTableOptions, KuduClient, KuduTable, MiniKuduCluster}
+import org.kududb.{Schema, Type}
+import org.scalatest.{BeforeAndAfterAll, Suite}
+
+import scala.collection.immutable.IndexedSeq
+
+trait TestContext extends BeforeAndAfterAll { self: Suite =>
+
+  var sc: SparkContext = null
+  var miniCluster: MiniKuduCluster = null
+  var kuduClient: KuduClient = null
+  var table: KuduTable = null
+  var kuduContext: KuduContext = null
+
+  val tableName = "test"
+
+  lazy val schema: Schema = {
+    val columns = ImmutableList.of(
+      new ColumnSchemaBuilder("key", Type.INT32).key(true).build(),
+      new ColumnSchemaBuilder("c1_i", Type.INT32).build(),
+      new ColumnSchemaBuilder("c2_s", Type.STRING).nullable(true).build(),
+      new ColumnSchemaBuilder("c3_double", Type.DOUBLE).build(),
+      new ColumnSchemaBuilder("c4_long", Type.INT64).build(),
+      new ColumnSchemaBuilder("c5_bool", Type.BOOL).build(),
+      new ColumnSchemaBuilder("c6_short", Type.INT16).build(),
+      new ColumnSchemaBuilder("c7_float", Type.FLOAT).build())
+    new Schema(columns)
+  }
+
+  val appID = new Date().toString + math.floor(math.random * 10E4).toLong.toString
+
+  val conf = new SparkConf().
+    setMaster("local[*]").
+    setAppName("test").
+    set("spark.ui.enabled", "false").
+    set("spark.app.id", appID)
+
+  override def beforeAll() {
+    miniCluster = new MiniKuduClusterBuilder()
+      .numMasters(1)
+      .numTservers(1)
+      .build()
+    val envMap = Map[String,String](("Xmx", "512m"))
+
+    sc = new SparkContext(conf)
+
+    kuduClient = new KuduClientBuilder(miniCluster.getMasterAddresses).build()
+    assert(miniCluster.waitForTabletServers(1))
+
+    kuduContext = new KuduContext(miniCluster.getMasterAddresses)
+
+    val tableOptions = new CreateTableOptions().setNumReplicas(1)
+    table = kuduClient.createTable(tableName, schema, tableOptions)
+  }
+
+  override def afterAll() {
+    if (kuduClient != null) kuduClient.shutdown()
+    if (miniCluster != null) miniCluster.shutdown()
+    if (sc != null) sc.stop()
+  }
+
+  def insertRows(rowCount: Integer): IndexedSeq[(Int, Int, String)] = {
+    val kuduSession = kuduClient.newSession()
+
+    val rows = Range(0, rowCount).map { i =>
+      val insert = table.newInsert
+      val row = insert.getRow
+      row.addInt(0, i)
+      row.addInt(1, i)
+      row.addDouble(3, i.toDouble)
+      row.addLong(4, i.toLong)
+      row.addBoolean(5, i%2==1)
+      row.addShort(6, i.toShort)
+      row.addFloat(7, i.toFloat)
+
+      // Sprinkling some nulls so that queries see them.
+      val s = if (i % 2 == 0) {
+        row.addString(2, i.toString)
+        i.toString
+      } else {
+        row.setNull(2)
+        null
+      }
+
+      kuduSession.apply(insert)
+      (i, i, s)
+    }
+    rows
+  }
+}
\ No newline at end of file


Mime
View raw message