kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danburk...@apache.org
Subject incubator-kudu git commit: Kudu Spark Datasource for insertions and updates
Date Thu, 19 May 2016 18:48:52 GMT
Repository: incubator-kudu
Updated Branches:
  refs/heads/master 591c9ccb2 -> 069a38fb4


Kudu Spark Datasource for insertions and updates

Change-Id: I74f20a6c17c47f424dfda62854963dc19c3b78c3
Reviewed-on: http://gerrit.cloudera.org:8080/2992
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <dan@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/069a38fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/069a38fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/069a38fb

Branch: refs/heads/master
Commit: 069a38fb4e7728023542309521db9aa6e5cb2aee
Parents: 591c9cc
Author: cgeorge <chris.george@rms.com>
Authored: Fri May 6 16:43:47 2016 -0600
Committer: Dan Burkert <dan@cloudera.com>
Committed: Thu May 19 18:48:10 2016 +0000

----------------------------------------------------------------------
 docs/developing.adoc                            |  18 ++-
 docs/release_notes.adoc                         |   2 +
 .../org/kududb/spark/kudu/DefaultSource.scala   |  65 +++++++--
 .../org/kududb/spark/kudu/KuduContext.scala     | 132 ++++++++++++++++++-
 .../scala/org/kududb/spark/kudu/package.scala   |  10 +-
 .../kududb/spark/kudu/DefaultSourceTest.scala   |  97 +++++++++++++-
 .../org/kududb/spark/kudu/TestContext.scala     |   7 +
 7 files changed, 312 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/069a38fb/docs/developing.adoc
----------------------------------------------------------------------
diff --git a/docs/developing.adoc b/docs/developing.adoc
index 89ee517..42b1110 100644
--- a/docs/developing.adoc
+++ b/docs/developing.adoc
@@ -111,11 +111,27 @@ Then import kudu-spark and create a dataframe:
 import org.kududb.spark.kudu._
 val kuduDataFrame =  sqlContext.read.options(Map("kudu.master"-> "your.kudu.master.here","kudu.table"->
"your.kudu.table.here")).kudu
 // Then query using spark api or register a temporary table and use spark sql
-scala> kuduDataFrame.select("id").filter("id">=5).show()
+kuduDataFrame.select("id").filter("id">=5).show()
 // Register kuduDataFrame as a temporary table for spark-sql
 kuduDataFrame.registerTempTable("kudu_table")
 // Select from the dataframe
 sqlContext.sql("select id from kudu_table where id>=5").show()
+
+// create a new kudu table from a dataframe
+val kuduContext = new KuduContext("your.kudu.master.here")
+kuduContext.createTable("testcreatetable", df.schema, Seq("key"), new CreateTableOptions().setNumReplicas(1))
+
+// then we can insert data into the kudu table
+df.write.options(Map("kudu.master"-> "your.kudu.master.here","kudu.table"-> "your.kudu.table.here")).mode("append").kudu
+
+// to update existing data change the mode to 'overwrite'
+df.write.options(Map("kudu.master"-> "your.kudu.master.here","kudu.table"-> "your.kudu.table.here")).mode("overwrite").kudu
+
+// to check for existance of a kudu table
+kuduContext.tableExists("your.kudu.table.here")
+
+// to delete a kudu table
+kuduContext.deleteTable("your.kudu.table.here")
 ----
 
 == Integration with MapReduce, YARN, and Other Frameworks

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/069a38fb/docs/release_notes.adoc
----------------------------------------------------------------------
diff --git a/docs/release_notes.adoc b/docs/release_notes.adoc
index e0ae844..b3889c0 100644
--- a/docs/release_notes.adoc
+++ b/docs/release_notes.adoc
@@ -77,6 +77,8 @@ Hadoop storage technologies.
   using mapreduce api. Includes predicate pushdowns for spark-sql and spark filters.
   Parallel retrieval for multiple tablets and column projections. link:developing.html#_kudu_integration_with_spark[Kudu
integration with Spark Example]
 
+- link:http://gerrit.cloudera.org:8080/#/c/2992/ Added ability to update and insert from
spark using kudu datasource
+
 [[rn_0.9.0_changes]]
 ==== Other noteworthy changes
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/069a38fb/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala
index 6226862..95145d5 100644
--- a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala
@@ -22,11 +22,13 @@ import java.sql.Timestamp
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
 import org.kududb.Type
 import org.kududb.annotations.InterfaceStability
-import org.kududb.client.{KuduPredicate, KuduTable}
+import org.kududb.client._
 import org.kududb.client.KuduPredicate.ComparisonOp
+import org.kududb.client.SessionConfiguration.FlushMode
+import org.apache.spark.sql.SaveMode._
 
 import scala.collection.JavaConverters._
 
@@ -35,7 +37,7 @@ import scala.collection.JavaConverters._
   * This class will produce a relationProvider based on input given to it from spark.
   */
 @InterfaceStability.Unstable
-class DefaultSource extends RelationProvider {
+class DefaultSource extends RelationProvider with CreatableRelationProvider {
 
   val TABLE_KEY = "kudu.table"
   val KUDU_MASTER = "kudu.master"
@@ -50,14 +52,42 @@ class DefaultSource extends RelationProvider {
   override def createRelation(sqlContext: SQLContext,
                               parameters: Map[String, String]):
   BaseRelation = {
-    val tableName = parameters.get(TABLE_KEY)
-    if (tableName.isEmpty) {
-      throw new IllegalArgumentException(s"Invalid value for $TABLE_KEY '$tableName'")
-    }
+    val tableName = parameters.getOrElse(TABLE_KEY,
+      throw new IllegalArgumentException(s"Kudu table name must be specified in create options
" +
+        s"using key '$TABLE_KEY'"))
+    val kuduMaster = parameters.getOrElse(KUDU_MASTER, "localhost")
+
+    new KuduRelation(tableName, kuduMaster)(sqlContext)
+  }
+
+  /**
+    * Creates a relation and inserts data to specified table.
+    *
+    * @param sqlContext
+    * @param mode Append will not overwrite existing data, Overwrite will perform update,
but will
+    *             not insert data, use upsert on KuduContext if you require both
+    * @param parameters Nessisary parameters for kudu.table and kudu.master
+    * @param data Dataframe to save into kudu
+    * @return returns populated base relation
+    */
+  override def createRelation(sqlContext: SQLContext, mode: SaveMode,
+                              parameters: Map[String, String], data: DataFrame): BaseRelation
= {
+    val tableName = parameters.getOrElse(TABLE_KEY,
+      throw new IllegalArgumentException(s"Kudu table name must be specified in create options
" +
+        s"using key '$TABLE_KEY'"))
 
     val kuduMaster = parameters.getOrElse(KUDU_MASTER, "localhost")
 
-    new KuduRelation(tableName.get, kuduMaster)(sqlContext)
+    val kuduRelation = new KuduRelation(tableName, kuduMaster)(sqlContext)
+    mode match {
+      case Append | Ignore => kuduRelation.insert(data, overwrite = false)
+      case Overwrite => kuduRelation.insert(data, overwrite = true)
+      case ErrorIfExists =>
+          throw new UnsupportedOperationException(
+            "ErrorIfExists is currently not supported")
+    }
+
+    kuduRelation
   }
 }
 
@@ -73,7 +103,9 @@ class KuduRelation(private val tableName: String,
                    private val kuduMaster: String)(
                    val sqlContext: SQLContext)
 extends BaseRelation
-with PrunedFilteredScan {
+with PrunedFilteredScan
+with InsertableRelation {
+
   import KuduRelation._
 
   private val context: KuduContext = new KuduContext(kuduMaster)
@@ -103,12 +135,12 @@ with PrunedFilteredScan {
     *
     * @param requiredColumns columns that are being requested by the requesting query
     * @param filters         filters that are being applied by the requesting query
-    * @return                RDD will all the results from Kudu
+    * @return RDD will all the results from Kudu
     */
   override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
= {
     val predicates = filters.flatMap(filterToPredicate)
-    new KuduRDD(kuduMaster, 1024*1024*20, requiredColumns, predicates,
-                table, context, sqlContext.sparkContext)
+    new KuduRDD(kuduMaster, 1024 * 1024 * 20, requiredColumns, predicates,
+      table, context, sqlContext.sparkContext)
   }
 
   /**
@@ -159,6 +191,15 @@ with PrunedFilteredScan {
       case value: Array[Byte] => KuduPredicate.newComparisonPredicate(columnSchema, operator,
value)
     }
   }
+
+  /**
+    * Inserts data into an existing Kudu table.
+    * @param data [[DataFrame]] to be inserted into Kudu
+    * @param overwrite If True it will update existing records, but will not perform inserts.
+    */
+  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+    context.writeRows(data, tableName, overwrite)
+  }
 }
 
 private[spark] object KuduRelation {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/069a38fb/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala
index 47984b2..e230acd 100644
--- a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala
@@ -17,12 +17,19 @@
 
 package org.kududb.spark.kudu
 
+import java.util
 import org.apache.hadoop.util.ShutdownHookManager
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.types.{StructField, StructType, DataType, DataTypes}
+import org.kududb.{ColumnSchema, Schema, Type}
 import org.kududb.annotations.InterfaceStability
-import org.kududb.client.{AsyncKuduClient, KuduClient}
+import org.kududb.client.SessionConfiguration.FlushMode
+import org.kududb.client._
+import scala.collection.JavaConverters._
+import java.sql.Timestamp
+
 
 /**
   * KuduContext is a serializable container for Kudu client connections.
@@ -75,4 +82,123 @@ class KuduContext(kuduMaster: String) extends Serializable {
     new KuduRDD(kuduMaster, 1024*1024*20, columnProjection.toArray, Array(),
                 syncClient.openTable(tableName), this, sc)
   }
-}
+
+  /**
+    * Check if kudu table already exists
+    * @param tableName tablename to check
+    * @return true if table exists, false if table does not exist
+    */
+  def tableExists(tableName: String): Boolean = syncClient.tableExists(tableName)
+
+  /**
+    * Delete kudu table
+    * @param tableName tablename to delete
+    * @return DeleteTableResponse
+    */
+  def deleteTable(tableName: String): DeleteTableResponse = syncClient.deleteTable(tableName)
+
+  /**
+    * Creates a kudu table for the given schema. Partitioning can be specified through options.
+    * @param tableName table to create
+    * @param schema struct schema of table
+    * @param keys primary keys of the table
+    * @param options replication and partitioning options for the table
+    */
+  def createTable(tableName: String,
+                  schema: StructType,
+                  keys: Seq[String],
+                  options: CreateTableOptions): KuduTable = {
+    val kuduCols = new util.ArrayList[ColumnSchema]()
+    // add the key columns first, in the order specified
+    for (key <- keys) {
+      val f = schema.fields(schema.fieldIndex(key))
+      kuduCols.add(new ColumnSchema.ColumnSchemaBuilder(f.name, kuduType(f.dataType)).key(true).build())
+    }
+    // now add the non-key columns
+    for (f <- schema.fields.filter(field=> !keys.contains(field.name))) {
+      kuduCols.add(new ColumnSchema.ColumnSchemaBuilder(f.name, kuduType(f.dataType)).nullable(f.nullable).key(false).build())
+    }
+
+    syncClient.createTable(tableName, new Schema(kuduCols), options)
+  }
+
+  /** Map Spark SQL type to Kudu type */
+  def kuduType(dt: DataType) : Type = dt match {
+    case DataTypes.BinaryType => Type.BINARY
+    case DataTypes.BooleanType => Type.BOOL
+    case DataTypes.StringType => Type.STRING
+    case DataTypes.TimestampType => Type.TIMESTAMP
+    case DataTypes.ByteType => Type.INT8
+    case DataTypes.ShortType => Type.INT16
+    case DataTypes.IntegerType => Type.INT32
+    case DataTypes.LongType => Type.INT64
+    case DataTypes.FloatType => Type.FLOAT
+    case DataTypes.DoubleType => Type.DOUBLE
+    case _ => throw new IllegalArgumentException(s"No support for Spark SQL type $dt")
+  }
+
+  /**
+    * Inserts or updates rows in kudu from a [[DataFrame]].
+    * @param data `DataFrame` to insert/update
+    * @param tableName table to perform insertion on
+    * @param overwrite true=update, false=insert
+    */
+  def writeRows(data: DataFrame, tableName: String, overwrite: Boolean) {
+    val schema = data.schema
+    data.foreachPartition(iterator => {
+      val pendingErrors = writeRows(iterator, schema, tableName, overwrite)
+      val errorCount = pendingErrors.getRowErrors.length
+      if (errorCount > 0) {
+        val errors = pendingErrors.getRowErrors.take(5).map(_.getErrorStatus).mkString
+        throw new RuntimeException(
+          s"failed to write $errorCount rows from DataFrame to Kudu; sample errors: $errors")
+      }
+    })
+  }
+
+  /**
+    * Saves partitions of a [[DataFrame]] into Kudu.
+    * @param rows rows to insert or update
+    * @param tableName table to insert or update on
+    */
+  def writeRows(rows: Iterator[Row],
+                schema: StructType,
+                tableName: String,
+                performAsUpdate : Boolean = false): RowErrorsAndOverflowStatus = {
+    val table: KuduTable = syncClient.openTable(tableName)
+    val kuduSchema = table.getSchema
+    val indices: Array[(Int, Int)] = schema.fields.zipWithIndex.map({ case (field, sparkIdx)
=>
+      sparkIdx -> table.getSchema.getColumnIndex(field.name)
+    })
+    val session: KuduSession = syncClient.newSession
+    session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
+    session.setIgnoreAllDuplicateRows(true)
+    try {
+      for (row <- rows) {
+        val operation = if (performAsUpdate) { table.newUpdate() } else { table.newInsert()
}
+        for ((sparkIdx, kuduIdx) <- indices) {
+          if (row.isNullAt(sparkIdx)) {
+            operation.getRow.setNull(kuduIdx)
+          } else schema.fields(sparkIdx).dataType match {
+            case DataTypes.StringType => operation.getRow.addString(kuduIdx, row.getString(sparkIdx))
+            case DataTypes.BinaryType => operation.getRow.addBinary(kuduIdx, row.getAs[Array[Byte]](sparkIdx))
+            case DataTypes.BooleanType => operation.getRow.addBoolean(kuduIdx, row.getBoolean(sparkIdx))
+            case DataTypes.ByteType => operation.getRow.addByte(kuduIdx, row.getByte(sparkIdx))
+            case DataTypes.ShortType => operation.getRow.addShort(kuduIdx, row.getShort(sparkIdx))
+            case DataTypes.IntegerType => operation.getRow.addInt(kuduIdx, row.getInt(sparkIdx))
+            case DataTypes.LongType => operation.getRow.addLong(kuduIdx, row.getLong(sparkIdx))
+            case DataTypes.FloatType => operation.getRow.addFloat(kuduIdx, row.getFloat(sparkIdx))
+            case DataTypes.DoubleType => operation.getRow.addDouble(kuduIdx, row.getDouble(sparkIdx))
+            case DataTypes.TimestampType => operation.getRow.addLong(kuduIdx, KuduRelation.timestampToMicros(row.getTimestamp(sparkIdx)))
+            case t => throw new IllegalArgumentException(s"No support for Spark SQL type
$t")
+          }
+        }
+        session.apply(operation)
+      }
+    } finally {
+      session.close()
+    }
+    session.getPendingErrors
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/069a38fb/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala
index 29ba455..4203e31 100755
--- a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala
+++ b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala
@@ -16,7 +16,7 @@
  */
 package org.kududb.spark
 
-import org.apache.spark.sql.{DataFrame, DataFrameReader}
+import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter}
 
 package object kudu {
 
@@ -27,4 +27,12 @@ package object kudu {
   implicit class KuduDataFrameReader(reader: DataFrameReader) {
     def kudu: DataFrame = reader.format("org.kududb.spark.kudu").load
   }
+
+  /**
+    * Adds a method, `kudu`, to DataFrameWriter that allows writes to Kudu using
+    * the DataFileWriter
+    */
+    implicit class KuduDataFrameWriter(writer: DataFrameWriter) {
+      def kudu = writer.format("org.kududb.spark.kudu").save
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/069a38fb/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala
b/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala
index 7161ace..19295e9 100644
--- a/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala
@@ -23,8 +23,10 @@ import java.util.TimeZone
 import org.apache.spark.sql.SQLContext
 import org.junit.Assert._
 import org.junit.runner.RunWith
+import org.kududb.client.CreateTableOptions
 import org.scalatest.{BeforeAndAfter, FunSuite}
 import org.scalatest.junit.JUnitRunner
+import org.apache.spark.sql.functions._
 
 import scala.collection.immutable.IndexedSeq
 
@@ -53,17 +55,108 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter
{
   val rowCount = 10
   var sqlContext : SQLContext = _
   var rows : IndexedSeq[(Int, Int, String)] = _
+  var kuduOptions : Map[String, String] = _
+
   before {
     val rowCount = 10
     rows = insertRows(rowCount)
 
     sqlContext = new SQLContext(sc)
 
-    sqlContext.read.options(
-      Map("kudu.table" -> tableName, "kudu.master" -> miniCluster.getMasterAddresses)).kudu
+    kuduOptions = Map(
+      "kudu.table" -> tableName,
+      "kudu.master" -> miniCluster.getMasterAddresses)
+
+    sqlContext.read.options(kuduOptions).kudu
       .registerTempTable(tableName)
   }
 
+  test("table creation") {
+    if(kuduContext.tableExists("testcreatetable")) {
+      kuduContext.deleteTable("testcreatetable")
+    }
+
+    val df = sqlContext.read.options(kuduOptions).kudu
+
+    kuduContext.createTable("testcreatetable", df.schema, Seq("key"), new CreateTableOptions().setNumReplicas(1))
+
+    // now use new options to refer to the new table name
+    val newOptions: Map[String, String] = Map(
+      "kudu.table" -> "testcreatetable",
+      "kudu.master" -> miniCluster.getMasterAddresses)
+
+    df.write.options(newOptions).mode("append").kudu
+
+    val checkDf = sqlContext.read.options(newOptions).kudu
+
+    assert(checkDf.schema === df.schema)
+
+    assertTrue(kuduContext.tableExists("testcreatetable"))
+    assert(checkDf.count == 10)
+    kuduContext.deleteTable("testcreatetable")
+
+    assertFalse(kuduContext.tableExists("testcreatetable"))
+  }
+
+  test("insertion") {
+    val df = sqlContext.read.options(      kuduOptions).kudu
+    val changedDF = df.limit(1).withColumn("key", df("key").plus(100)).withColumn("c2_s",
lit("abc"))
+    changedDF.show
+    changedDF.write.options(kuduOptions).mode("append").kudu
+
+    val newDF = sqlContext.read.options(kuduOptions).kudu
+    newDF.show
+    val collected = newDF.filter("key = 100").collect()
+    assertEquals("abc", collected(0).getAs[String]("c2_s"))
+
+    deleteRow(100)
+  }
+
+  test("insertion multiple") {
+    val df = sqlContext.read.options(kuduOptions).kudu
+    val changedDF = df.limit(2).withColumn("key", df("key").plus(100)).withColumn("c2_s",
lit("abc"))
+    changedDF.show
+    changedDF.write.options(kuduOptions).mode("append").kudu
+
+    val newDF = sqlContext.read.options(kuduOptions).kudu
+    newDF.show
+    val collected = newDF.filter("key = 100").collect()
+    assertEquals("abc", collected(0).getAs[String]("c2_s"))
+
+    val collectedTwo = newDF.filter("key = 101").collect()
+    assertEquals("abc", collectedTwo(0).getAs[String]("c2_s"))
+
+    deleteRow(100)
+    deleteRow(101)
+  }
+
+  test("update row") {
+    val df = sqlContext.read.options(kuduOptions).kudu
+    val baseDF = df.limit(1) // filter down to just the first row
+    baseDF.show
+    // change the c2 string to abc and update
+    val changedDF = baseDF.withColumn("c2_s", lit("abc"))
+    changedDF.show
+    changedDF.write.options(kuduOptions).mode("overwrite").kudu
+
+    //read the data back
+    val newDF = sqlContext.read.options(kuduOptions).kudu
+    newDF.show
+    val collected = newDF.filter("key = 0").collect()
+    assertEquals("abc", collected(0).getAs[String]("c2_s"))
+
+    //rewrite the original value
+    baseDF.withColumn("c2_s", lit("0")).write.options(kuduOptions)
+      .mode("overwrite").kudu
+  }
+
+  test("out of order selection") {
+    val df = sqlContext.read.options(kuduOptions).kudu.select( "c2_s", "c1_i", "key")
+    val collected = df.collect()
+    assert(collected(0).getString(0).equals("0"))
+
+  }
+
   test("table scan") {
     val results = sqlContext.sql(s"SELECT * FROM $tableName").collectAsList()
     assert(results.size() == rowCount)

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/069a38fb/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/TestContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/TestContext.scala b/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/TestContext.scala
index 97a4d39..1613251 100644
--- a/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/TestContext.scala
+++ b/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/TestContext.scala
@@ -84,6 +84,13 @@ trait TestContext extends BeforeAndAfterAll { self: Suite =>
     if (sc != null) sc.stop()
   }
 
+  def deleteRow(key: Int): Unit = {
+    val kuduSession = kuduClient.newSession()
+    val delete = table.newDelete()
+    delete.getRow.addInt(0, key)
+    kuduSession.apply(delete)
+  }
+
   def insertRows(rowCount: Integer): IndexedSeq[(Int, Int, String)] = {
     val kuduSession = kuduClient.newSession()
 


Mime
View raw message