Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6D44D200B78 for ; Thu, 18 Aug 2016 21:27:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6BF48160AC1; Thu, 18 Aug 2016 19:27:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 417BA160ABF for ; Thu, 18 Aug 2016 21:27:27 +0200 (CEST) Received: (qmail 9792 invoked by uid 500); 18 Aug 2016 19:27:26 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 9774 invoked by uid 99); 18 Aug 2016 19:27:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Aug 2016 19:27:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4F10DE024E; Thu, 18 Aug 2016 19:27:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: danburkert@apache.org To: commits@kudu.apache.org Date: Thu, 18 Aug 2016 19:27:27 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/3] kudu git commit: [spark] Add insert-ignore, update, and delete as write options archived-at: Thu, 18 Aug 2016 19:27:28 -0000 [spark] Add insert-ignore, update, and delete as write options Change-Id: I2781104c8a655da0287977b93188e9a65e7d68bb Reviewed-on: http://gerrit.cloudera.org:8080/4016 Tested-by: Kudu Jenkins Reviewed-by: Will Berkeley Reviewed-by: Todd Lipcon Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9acba45a Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9acba45a Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9acba45a Branch: refs/heads/master Commit: 9acba45ac147fbae5349b7bc559a75ca41c4084b Parents: 8e09910 Author: Dan Burkert Authored: Mon Aug 15 18:51:37 2016 -0700 Committer: Dan Burkert Committed: Thu Aug 18 19:26:55 2016 +0000 ---------------------------------------------------------------------- .../apache/kudu/spark/kudu/DefaultSource.scala | 60 +++++++++++--------- .../apache/kudu/spark/kudu/KuduContext.scala | 40 ++++++++----- .../org/apache/kudu/spark/kudu/KuduRDD.scala | 10 ++-- .../apache/kudu/spark/kudu/OperationType.scala | 44 ++++++++++++++ .../org/apache/kudu/spark/kudu/package.scala | 2 +- .../kudu/spark/kudu/DefaultSourceTest.scala | 23 ++++++++ 6 files changed, 131 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/9acba45a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala index a3384b2..286a75c 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala @@ -27,13 +27,16 @@ import org.apache.kudu.Type import org.apache.kudu.annotations.InterfaceStability import org.apache.kudu.client._ import org.apache.kudu.client.KuduPredicate.ComparisonOp -import org.apache.spark.sql.SaveMode._ import scala.collection.JavaConverters._ /** - * DefaultSource for integration with Spark's dataframe datasources. - * This class will produce a relationProvider based on input given to it from Spark. + * Data source for integration with Spark's [[DataFrame]] API. + * + * Serves as a factory for [[KuduRelation]] instances for Spark. Spark will + * automatically look for a [[RelationProvider]] implementation named + * `DefaultSource` when the user specifies the path of a source during DDL + * operations through [[org.apache.spark.sql.DataFrameReader.format]]. */ @InterfaceStability.Unstable class DefaultSource extends RelationProvider with CreatableRelationProvider { @@ -53,16 +56,21 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider { parameters: Map[String, String]): BaseRelation = { val tableName = parameters.getOrElse(TABLE_KEY, - throw new IllegalArgumentException(s"Kudu table name must be specified in create options " + - s"using key '$TABLE_KEY'")) + throw new IllegalArgumentException( + s"Kudu table name must be specified in create options using key '$TABLE_KEY'")) val kuduMaster = parameters.getOrElse(KUDU_MASTER, "localhost") - val upsert = parameters.getOrElse(OPERATION, "upsert").toLowerCase match { - case "upsert" => true - case "insert" => false - case _ => throw new UnsupportedOperationException(s"$OPERATION must be upsert or insert") + + val opParam = parameters.getOrElse(OPERATION, "upsert") + val operationType = opParam.toLowerCase match { + case "insert" => Insert + case "insert-ignore" => InsertIgnore + case "upsert" => Upsert + case "update" => Update + case "delete" => Delete + case _ => throw new IllegalArgumentException(s"Unsupported operation type '$opParam'") } - new KuduRelation(tableName, kuduMaster, upsert)(sqlContext) + new KuduRelation(tableName, kuduMaster, operationType)(sqlContext) } /** @@ -79,7 +87,7 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider { parameters: Map[String, String], data: DataFrame): BaseRelation = { val kuduRelation = createRelation(sqlContext, parameters) mode match { - case Append => kuduRelation.asInstanceOf[KuduRelation].insert(data, false) + case SaveMode.Append => kuduRelation.asInstanceOf[KuduRelation].insert(data, false) case _ => throw new UnsupportedOperationException( "Currently, only Append is supported") } @@ -92,14 +100,14 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider { * Implementation of Spark BaseRelation. * * @param tableName Kudu table that we plan to read from - * @param kuduMaster Kudu master addresses - * @param upsert Whether the relation will be inserted or upserted by default + * @param masterAddrs Kudu master addresses + * @param operationType The default operation type to perform when writing to the relation * @param sqlContext SparkSQL context */ @InterfaceStability.Unstable class KuduRelation(private val tableName: String, - private val kuduMaster: String, - private val upsert: Boolean)( + private val masterAddrs: String, + private val operationType: OperationType)( val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan @@ -107,7 +115,7 @@ with InsertableRelation { import KuduRelation._ - private val context: KuduContext = new KuduContext(kuduMaster) + private val context: KuduContext = new KuduContext(masterAddrs) private val table: KuduTable = context.syncClient.openTable(tableName) override def unhandledFilters(filters: Array[Filter]): Array[Filter] = @@ -123,7 +131,7 @@ with InsertableRelation { val fields: Array[StructField] = table.getSchema.getColumns.asScala.map { columnSchema => val sparkType = kuduTypeToSparkType(columnSchema.getType) - new StructField(columnSchema.getName, sparkType, columnSchema.isNullable) + StructField(columnSchema.getName, sparkType, columnSchema.isNullable) }.toArray new StructType(fields) @@ -138,8 +146,8 @@ with InsertableRelation { */ override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { val predicates = filters.flatMap(filterToPredicate) - new KuduRDD(kuduMaster, 1024 * 1024 * 20, requiredColumns, predicates, - table, context, sqlContext.sparkContext) + new KuduRDD(masterAddrs, 1024 * 1024 * 20, requiredColumns, predicates, + table, context, sqlContext.sparkContext) } /** @@ -192,21 +200,19 @@ with InsertableRelation { } /** - * By default, upserts data into an existing Kudu table. - * If the kudu.upsert parameter is set to false, data is inserted instead of upserted. + * Writes data into an existing Kudu table. + * + * If the `kudu.operation` parameter is set, the data will use that operation + * type. If the parameter is unset, the data will be upserted. * * @param data [[DataFrame]] to be inserted into Kudu * @param overwrite must be false; otherwise, throws [[UnsupportedOperationException]] */ override def insert(data: DataFrame, overwrite: Boolean): Unit = { if (overwrite) { - throw new UnsupportedOperationException("overwrite is not supported") - } - if (upsert) { - context.upsertRows(data, tableName) - } else { - context.insertRows(data, tableName) + throw new UnsupportedOperationException("overwrite is not yet supported") } + context.writeRows(data, tableName, operationType) } } http://git-wip-us.apache.org/repos/asf/kudu/blob/9acba45a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala index ae463b7..0fa9df9 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala @@ -129,17 +129,18 @@ class KuduContext(kuduMaster: String) extends Serializable { * @param tableName the Kudu table to insert into */ def insertRows(data: DataFrame, tableName: String): Unit = { - writeRows(data, tableName, table => table.newInsert()) + writeRows(data, tableName, Insert) } /** - * Updates a Kudu table with the rows of a [[DataFrame]]. + * Inserts the rows of a [[DataFrame]] into a Kudu table, ignoring any new + * rows that have a primary key conflict with existing rows. * - * @param data the data to update into Kudu - * @param tableName the Kudu table to update + * @param data the data to insert into Kudu + * @param tableName the Kudu table to insert into */ - def updateRows(data: DataFrame, tableName: String): Unit = { - writeRows(data, tableName, table => table.newUpdate()) + def insertIgnoreRows(data: DataFrame, tableName: String): Unit = { + writeRows(data, tableName, InsertIgnore) } /** @@ -149,7 +150,17 @@ class KuduContext(kuduMaster: String) extends Serializable { * @param tableName the Kudu table to upsert into */ def upsertRows(data: DataFrame, tableName: String): Unit = { - writeRows(data, tableName, table => table.newUpsert()) + writeRows(data, tableName, Upsert) + } + + /** + * Updates a Kudu table with the rows of a [[DataFrame]]. + * + * @param data the data to update into Kudu + * @param tableName the Kudu table to update + */ + def updateRows(data: DataFrame, tableName: String): Unit = { + writeRows(data, tableName, Update) } /** @@ -157,16 +168,16 @@ class KuduContext(kuduMaster: String) extends Serializable { * * @param data the data to delete from Kudu * note that only the key columns should be specified for deletes - * @param tableName + * @param tableName The Kudu tabe to delete from */ def deleteRows(data: DataFrame, tableName: String): Unit = { - writeRows(data, tableName, table => table.newDelete()) + writeRows(data, tableName, Delete) } - private def writeRows(data: DataFrame, tableName: String, newOp: KuduTable => Operation) { + private[kudu] def writeRows(data: DataFrame, tableName: String, operation: OperationType) { val schema = data.schema data.foreachPartition(iterator => { - val pendingErrors = writePartitionRows(iterator, schema, tableName, newOp) + val pendingErrors = writePartitionRows(iterator, schema, tableName, operation) val errorCount = pendingErrors.getRowErrors.length if (errorCount > 0) { val errors = pendingErrors.getRowErrors.take(5).map(_.getErrorStatus).mkString @@ -179,17 +190,17 @@ class KuduContext(kuduMaster: String) extends Serializable { private def writePartitionRows(rows: Iterator[Row], schema: StructType, tableName: String, - newOp: KuduTable => Operation): RowErrorsAndOverflowStatus = { + operationType: OperationType): RowErrorsAndOverflowStatus = { val table: KuduTable = syncClient.openTable(tableName) - val kuduSchema = table.getSchema val indices: Array[(Int, Int)] = schema.fields.zipWithIndex.map({ case (field, sparkIdx) => sparkIdx -> table.getSchema.getColumnIndex(field.name) }) val session: KuduSession = syncClient.newSession session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND) + session.setIgnoreAllDuplicateRows(operationType.ignoreDuplicateRowErrors) try { for (row <- rows) { - val operation = newOp(table) + val operation = operationType.operation(table) for ((sparkIdx, kuduIdx) <- indices) { if (row.isNullAt(sparkIdx)) { operation.getRow.setNull(kuduIdx) @@ -214,7 +225,6 @@ class KuduContext(kuduMaster: String) extends Serializable { } session.getPendingErrors } - } private object KuduConnection { http://git-wip-us.apache.org/repos/asf/kudu/blob/9acba45a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala index b3b69ec..bfd0b55 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala @@ -50,10 +50,10 @@ class KuduRDD(val kuduMaster: String, override protected def getPartitions: Array[Partition] = { val builder = kuduContext.syncClient - .newScanTokenBuilder(table) - .batchSizeBytes(batchSize) - .setProjectedColumnNames(projectedCols.toSeq.asJava) - .cacheBlocks(true) + .newScanTokenBuilder(table) + .batchSizeBytes(batchSize) + .setProjectedColumnNames(projectedCols.toSeq.asJava) + .cacheBlocks(true) for (predicate <- predicates) { builder.addPredicate(predicate) @@ -83,7 +83,7 @@ class KuduRDD(val kuduMaster: String, */ private[spark] class KuduPartition(val index: Int, val scanToken: Array[Byte], - val locations : Array[String]) extends Partition {} + val locations: Array[String]) extends Partition {} /** * A Spark SQL [[Row]] iterator which wraps a [[KuduScanner]]. http://git-wip-us.apache.org/repos/asf/kudu/blob/9acba45a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala new file mode 100644 index 0000000..fd23d05 --- /dev/null +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kudu.spark.kudu + +import org.apache.kudu.client.{KuduTable, Operation} + +/** + * OperationType enumerates the types of Kudu write operations. + */ +private[kudu] sealed trait OperationType { + def operation(table: KuduTable): Operation + def ignoreDuplicateRowErrors: Boolean = false +} +private[kudu] case object Insert extends OperationType { + override def operation(table: KuduTable): Operation = table.newInsert() +} +private[kudu] case object InsertIgnore extends OperationType { + override def operation(table: KuduTable): Operation = table.newInsert() + override def ignoreDuplicateRowErrors: Boolean = true +} +private[kudu] case object Update extends OperationType { + override def operation(table: KuduTable): Operation = table.newUpdate() +} +private[kudu] case object Upsert extends OperationType { + override def operation(table: KuduTable): Operation = table.newUpsert() +} +private[kudu] case object Delete extends OperationType { + override def operation(table: KuduTable): Operation = table.newDelete() +} http://git-wip-us.apache.org/repos/asf/kudu/blob/9acba45a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala index fe28f70..6f49023 100755 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala @@ -35,4 +35,4 @@ package object kudu { implicit class KuduDataFrameWriter(writer: DataFrameWriter) { def kudu = writer.format("org.apache.kudu.spark.kudu").save } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kudu/blob/9acba45a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala index a7dd209..2dfce76 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala @@ -124,6 +124,29 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter { deleteRow(101) } + test("insert ignore rows") { + val df = sqlContext.read.options(kuduOptions).kudu + val baseDF = df.limit(1) // filter down to just the first row + + // change the c2 string to abc and insert + val updateDF = baseDF.withColumn("c2_s", lit("abc")) + kuduContext.insertIgnoreRows(updateDF, tableName) + + // change the key and insert + val insertDF = df.limit(1).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("def")) + kuduContext.insertIgnoreRows(insertDF, tableName) + + // read the data back + val newDF = sqlContext.read.options(kuduOptions).kudu + val collectedUpdate = newDF.filter("key = 0").collect() + assertEquals("0", collectedUpdate(0).getAs[String]("c2_s")) + val collectedInsert = newDF.filter("key = 100").collect() + assertEquals("def", collectedInsert(0).getAs[String]("c2_s")) + + // restore the original state of the table + deleteRow(100) + } + test("upsert rows") { val df = sqlContext.read.options(kuduOptions).kudu val baseDF = df.limit(1) // filter down to just the first row