From commits-return-6468-archive-asf-public=cust-asf.ponee.io@kudu.apache.org Tue Oct 2 21:01:56 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 1A5EE180791 for ; Tue, 2 Oct 2018 21:01:54 +0200 (CEST) Received: (qmail 86257 invoked by uid 500); 2 Oct 2018 19:01:54 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 86136 invoked by uid 99); 2 Oct 2018 19:01:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Oct 2018 19:01:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 138BEE0054; Tue, 2 Oct 2018 19:01:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: granthenke@apache.org To: commits@kudu.apache.org Date: Tue, 02 Oct 2018 19:01:56 -0000 Message-Id: <0dc520efab844b44843a1be7d286065f@git.apache.org> In-Reply-To: <8fca6d938fb7438fb6f36330eb763c58@git.apache.org> References: <8fca6d938fb7438fb6f36330eb763c58@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/6] kudu git commit: [spark] Add KuduReadOptions to encapsulate the read parameters [spark] Add KuduReadOptions to encapsulate the read parameters This patch adds KuduReadOptions and refactors the exisiting private APIs to leverage it. Additionally some refactoring was done to KuduWriteOptions to ensure the interaction with both classes was unified. This patch is a preliminary step to adding keep alive support to the Spark integration. Change-Id: Iaee3d09471cad2cddd816ec77eafaae457faf1c2 Reviewed-on: http://gerrit.cloudera.org:8080/11537 Tested-by: Kudu Jenkins Reviewed-by: Hao Hao Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/545afa7a Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/545afa7a Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/545afa7a Branch: refs/heads/master Commit: 545afa7a4c952457b11889b6b552685dadb0750b Parents: dc8ae79 Author: Grant Henke Authored: Thu Sep 27 10:50:09 2018 -0700 Committer: Grant Henke Committed: Tue Oct 2 19:01:16 2018 +0000 ---------------------------------------------------------------------- .../apache/kudu/spark/kudu/DefaultSource.scala | 152 +++++++++---------- .../apache/kudu/spark/kudu/KuduContext.scala | 22 +-- .../org/apache/kudu/spark/kudu/KuduRDD.scala | 32 ++-- .../kudu/spark/kudu/KuduReadOptions.scala | 50 ++++++ .../kudu/spark/kudu/KuduWriteOptions.scala | 21 ++- .../org/apache/kudu/spark/kudu/SparkUtil.scala | 7 +- .../kudu/spark/kudu/DefaultSourceTest.scala | 19 +-- 7 files changed, 170 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/545afa7a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala index 3f9ae0a..29635a3 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala @@ -30,9 +30,13 @@ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SaveMode +import org.apache.yetus.audience.InterfaceAudience import org.apache.yetus.audience.InterfaceStability + import org.apache.kudu.client.KuduPredicate.ComparisonOp import org.apache.kudu.client._ +import org.apache.kudu.spark.kudu.KuduReadOptions._ +import org.apache.kudu.spark.kudu.KuduWriteOptions._ import org.apache.kudu.spark.kudu.SparkUtil._ /** @@ -43,6 +47,7 @@ import org.apache.kudu.spark.kudu.SparkUtil._ * `DefaultSource` when the user specifies the path of a source during DDL * operations through [[org.apache.spark.sql.DataFrameReader.format]]. */ +@InterfaceAudience.Private @InterfaceStability.Unstable class DefaultSource extends RelationProvider with CreatableRelationProvider with SchemaRelationProvider { @@ -56,16 +61,7 @@ class DefaultSource val IGNORE_DUPLICATE_ROW_ERRORS = "kudu.ignoreDuplicateRowErrors" val SCAN_REQUEST_TIMEOUT_MS = "kudu.scanRequestTimeoutMs" val SOCKET_READ_TIMEOUT_MS = "kudu.socketReadTimeoutMs" - - def defaultMasterAddrs: String = InetAddress.getLocalHost.getCanonicalHostName - - def getScanRequestTimeoutMs(parameters: Map[String, String]): Option[Long] = { - parameters.get(SCAN_REQUEST_TIMEOUT_MS).map(_.toLong) - } - - def getSocketReadTimeoutMs(parameters: Map[String, String]): Option[Long] = { - parameters.get(SOCKET_READ_TIMEOUT_MS).map(_.toLong) - } + val BATCH_SIZE = "kudu.batchSize" /** * Construct a BaseRelation using the provided context and parameters. @@ -77,33 +73,37 @@ class DefaultSource override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { + createRelation(sqlContext, parameters, null) + } + + /** + * Construct a BaseRelation using the provided context, parameters and schema. + * + * @param sqlContext SparkSQL context + * @param parameters parameters given to us from SparkSQL + * @param schema the schema used to select columns for the relation + * @return a BaseRelation Object + */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { val tableName = parameters.getOrElse( TABLE_KEY, throw new IllegalArgumentException( s"Kudu table name must be specified in create options using key '$TABLE_KEY'")) val kuduMaster = parameters.getOrElse(KUDU_MASTER, defaultMasterAddrs) val operationType = getOperationType(parameters.getOrElse(OPERATION, "upsert")) - val faultTolerantScanner = - Try(parameters.getOrElse(FAULT_TOLERANT_SCANNER, "false").toBoolean) - .getOrElse(false) - val scanLocality = getScanLocalityType(parameters.getOrElse(SCAN_LOCALITY, "closest_replica")) - val ignoreDuplicateRowErrors = Try(parameters(IGNORE_DUPLICATE_ROW_ERRORS).toBoolean) - .getOrElse(false) || - Try(parameters(OPERATION) == "insert-ignore").getOrElse(false) - val ignoreNull = - Try(parameters.getOrElse(IGNORE_NULL, "false").toBoolean).getOrElse(false) - val writeOptions = - new KuduWriteOptions(ignoreDuplicateRowErrors, ignoreNull) + val schemaOption = Option(schema) + val readOptions = getReadOptions(parameters) + val writeOptions = getWriteOptions(parameters) new KuduRelation( tableName, kuduMaster, - faultTolerantScanner, - scanLocality, - getScanRequestTimeoutMs(parameters), - getSocketReadTimeoutMs(parameters), operationType, - None, + schemaOption, + readOptions, writeOptions )(sqlContext) } @@ -130,36 +130,47 @@ class DefaultSource case _ => throw new UnsupportedOperationException("Currently, only Append is supported") } - kuduRelation } - override def createRelation( - sqlContext: SQLContext, - parameters: Map[String, String], - schema: StructType): BaseRelation = { - val tableName = parameters.getOrElse( - TABLE_KEY, - throw new IllegalArgumentException( - s"Kudu table name must be specified in create options " + - s"using key '$TABLE_KEY'")) - val kuduMaster = parameters.getOrElse(KUDU_MASTER, defaultMasterAddrs) - val operationType = getOperationType(parameters.getOrElse(OPERATION, "upsert")) + private def getReadOptions(parameters: Map[String, String]): KuduReadOptions = { + val batchSize = parameters.get(BATCH_SIZE).map(_.toInt).getOrElse(defaultBatchSize) val faultTolerantScanner = - Try(parameters.getOrElse(FAULT_TOLERANT_SCANNER, "false").toBoolean) - .getOrElse(false) - val scanLocality = getScanLocalityType(parameters.getOrElse(SCAN_LOCALITY, "closest_replica")) + parameters.get(FAULT_TOLERANT_SCANNER).map(_.toBoolean).getOrElse(defaultFaultTolerantScanner) + val scanLocality = + parameters.get(SCAN_LOCALITY).map(getScanLocalityType).getOrElse(defaultScanLocality) + val scanRequestTimeoutMs = parameters.get(SCAN_REQUEST_TIMEOUT_MS).map(_.toLong) + val socketReadTimeoutMs = parameters.get(SOCKET_READ_TIMEOUT_MS).map(_.toLong) - new KuduRelation( - tableName, - kuduMaster, - faultTolerantScanner, + KuduReadOptions( + batchSize, scanLocality, - getScanRequestTimeoutMs(parameters), - getSocketReadTimeoutMs(parameters), - operationType, - Some(schema) - )(sqlContext) + faultTolerantScanner, + scanRequestTimeoutMs, + socketReadTimeoutMs) + } + + private def getWriteOptions(parameters: Map[String, String]): KuduWriteOptions = { + val ignoreDuplicateRowErrors = + Try(parameters(IGNORE_DUPLICATE_ROW_ERRORS).toBoolean).getOrElse(false) || + Try(parameters(OPERATION) == "insert-ignore").getOrElse(false) + val ignoreNull = + parameters.get(IGNORE_NULL).map(_.toBoolean).getOrElse(defaultIgnoreNull) + + Try(parameters.getOrElse(IGNORE_NULL, "false").toBoolean).getOrElse(false) + + KuduWriteOptions(ignoreDuplicateRowErrors, ignoreNull) + } + + private def defaultMasterAddrs: String = InetAddress.getLocalHost.getCanonicalHostName + + private def getScanLocalityType(opParam: String): ReplicaSelection = { + opParam.toLowerCase match { + case "leader_only" => ReplicaSelection.LEADER_ONLY + case "closest_replica" => ReplicaSelection.CLOSEST_REPLICA + case _ => + throw new IllegalArgumentException(s"Unsupported replica selection type '$opParam'") + } } private def getOperationType(opParam: String): OperationType = { @@ -173,15 +184,6 @@ class DefaultSource throw new IllegalArgumentException(s"Unsupported operation type '$opParam'") } } - - private def getScanLocalityType(opParam: String): ReplicaSelection = { - opParam.toLowerCase match { - case "leader_only" => ReplicaSelection.LEADER_ONLY - case "closest_replica" => ReplicaSelection.CLOSEST_REPLICA - case _ => - throw new IllegalArgumentException(s"Unsupported replica selection type '$opParam'") - } - } } /** @@ -189,31 +191,25 @@ class DefaultSource * * @param tableName Kudu table that we plan to read from * @param masterAddrs Kudu master addresses - * @param faultTolerantScanner scanner type to be used. Fault tolerant if true, - * otherwise, use non fault tolerant one - * @param scanLocality If true scan locality is enabled, so that the scan will - * take place at the closest replica. - * @param scanRequestTimeoutMs Maximum time allowed per scan request, in milliseconds * @param operationType The default operation type to perform when writing to the relation * @param userSchema A schema used to select columns for the relation + * @param readOptions Kudu read options * @param writeOptions Kudu write options * @param sqlContext SparkSQL context */ +@InterfaceAudience.Private @InterfaceStability.Unstable class KuduRelation( - private val tableName: String, - private val masterAddrs: String, - private val faultTolerantScanner: Boolean, - private val scanLocality: ReplicaSelection, - private[kudu] val scanRequestTimeoutMs: Option[Long], - private[kudu] val socketReadTimeoutMs: Option[Long], - private val operationType: OperationType, - private val userSchema: Option[StructType], - private val writeOptions: KuduWriteOptions = new KuduWriteOptions)(val sqlContext: SQLContext) + val tableName: String, + val masterAddrs: String, + val operationType: OperationType, + val userSchema: Option[StructType], + val readOptions: KuduReadOptions = new KuduReadOptions, + val writeOptions: KuduWriteOptions = new KuduWriteOptions)(val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan with InsertableRelation { private val context: KuduContext = - new KuduContext(masterAddrs, sqlContext.sparkContext, socketReadTimeoutMs) + new KuduContext(masterAddrs, sqlContext.sparkContext, readOptions.socketReadTimeoutMs) private val table: KuduTable = context.syncClient.openTable(tableName) @@ -241,14 +237,10 @@ class KuduRelation( val predicates = filters.flatMap(filterToPredicate) new KuduRDD( context, - 1024 * 1024 * 20, + table, requiredColumns, predicates, - table, - faultTolerantScanner, - scanLocality, - scanRequestTimeoutMs, - socketReadTimeoutMs, + readOptions, sqlContext.sparkContext ) } http://git-wip-us.apache.org/repos/asf/kudu/blob/545afa7a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala index a9975b6..5a12ad6 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala @@ -20,8 +20,6 @@ package org.apache.kudu.spark.kudu import java.security.AccessController import java.security.PrivilegedAction -import java.sql.Timestamp - import javax.security.auth.Subject import javax.security.auth.login.AppConfigurationEntry import javax.security.auth.login.Configuration @@ -45,6 +43,7 @@ import org.apache.yetus.audience.InterfaceAudience import org.apache.yetus.audience.InterfaceStability import org.slf4j.Logger import org.slf4j.LoggerFactory + import org.apache.kudu.client.SessionConfiguration.FlushMode import org.apache.kudu.client._ import org.apache.kudu.spark.kudu.SparkUtil._ @@ -58,7 +57,8 @@ import org.apache.kudu.Type * [[KuduContext]] should be created in the driver, and shared with executors * as a serializable field. */ -@InterfaceStability.Unstable +@InterfaceAudience.Public +@InterfaceStability.Evolving class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeoutMs: Option[Long]) extends Serializable { @@ -137,19 +137,14 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou def kuduRDD( sc: SparkContext, tableName: String, - columnProjection: Seq[String] = Nil): RDD[Row] = { - // TODO: provide an elegant way to pass various options (faultTolerantScan, - // TODO: localityScan, etc) to KuduRDD + columnProjection: Seq[String] = Nil, + options: KuduReadOptions = KuduReadOptions()): RDD[Row] = { new KuduRDD( this, - 1024 * 1024 * 20, + syncClient.openTable(tableName), columnProjection.toArray, Array(), - syncClient.openTable(tableName), - false, - ReplicaSelection.LEADER_ONLY, - None, - None, + options, sc) } @@ -246,8 +241,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou @deprecated( "Use KuduContext.insertRows(data, tableName, new KuduWriteOptions(ignoreDuplicateRowErrors = true))") def insertIgnoreRows(data: DataFrame, tableName: String): Unit = { - val writeOptions = new KuduWriteOptions - writeOptions.ignoreDuplicateRowErrors = true + val writeOptions = KuduWriteOptions(ignoreDuplicateRowErrors = true) writeRows(data, tableName, Insert, writeOptions) } http://git-wip-us.apache.org/repos/asf/kudu/blob/545afa7a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala index c16a6a0..77dabcc 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala @@ -17,12 +17,13 @@ package org.apache.kudu.spark.kudu import scala.collection.JavaConverters._ - import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row import org.apache.spark.Partition import org.apache.spark.SparkContext import org.apache.spark.TaskContext +import org.apache.yetus.audience.InterfaceAudience +import org.apache.yetus.audience.InterfaceStability import org.apache.kudu.client._ import org.apache.kudu.Type @@ -33,45 +34,42 @@ import org.apache.kudu.client * * To construct a KuduRDD, use [[KuduContext#kuduRDD]] or a Kudu DataSource. */ +@InterfaceAudience.Public +@InterfaceStability.Evolving class KuduRDD private[kudu] ( val kuduContext: KuduContext, - @transient val batchSize: Integer, + @transient val table: KuduTable, @transient val projectedCols: Array[String], @transient val predicates: Array[client.KuduPredicate], - @transient val table: KuduTable, - @transient val isFaultTolerant: Boolean, - @transient val scanLocality: ReplicaSelection, - @transient val scanRequestTimeoutMs: Option[Long], - @transient val socketReadTimeoutMs: Option[Long], + @transient val options: KuduReadOptions, @transient val sc: SparkContext) extends RDD[Row](sc, Nil) { override protected def getPartitions: Array[Partition] = { val builder = kuduContext.syncClient .newScanTokenBuilder(table) - .batchSizeBytes(batchSize) + .batchSizeBytes(options.batchSize) .setProjectedColumnNames(projectedCols.toSeq.asJava) - .setFaultTolerant(isFaultTolerant) + .setFaultTolerant(options.faultTolerantScanner) .cacheBlocks(true) // A scan is partitioned to multiple ones. If scan locality is enabled, // each will take place at the closet replica from the executor. In this // case, to ensure the consistency of such scan, we use READ_AT_SNAPSHOT // read mode without setting a timestamp. - if (scanLocality == ReplicaSelection.CLOSEST_REPLICA) { - builder - .replicaSelection(ReplicaSelection.CLOSEST_REPLICA) - .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT) + builder.replicaSelection(options.scanLocality) + if (options.scanLocality == ReplicaSelection.CLOSEST_REPLICA) { + builder.readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT) } - scanRequestTimeoutMs match { - case Some(timeout) => builder.scanRequestTimeout(timeout) - case _ => + options.scanRequestTimeoutMs.foreach { timeout => + builder.scanRequestTimeout(timeout) } for (predicate <- predicates) { builder.addPredicate(predicate) } + val tokens = builder.build().asScala tokens.zipWithIndex.map { case (token, index) => @@ -79,7 +77,7 @@ class KuduRDD private[kudu] ( // replica selection policy is leader only, to take advantage // of scan locality. var locations: Array[String] = null - if (scanLocality == ReplicaSelection.LEADER_ONLY) { + if (options.scanLocality == ReplicaSelection.LEADER_ONLY) { locations = Array(token.getTablet.getLeaderReplica.getRpcHost) } else { locations = token.getTablet.getReplicas.asScala.map(_.getRpcHost).toArray http://git-wip-us.apache.org/repos/asf/kudu/blob/545afa7a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala new file mode 100644 index 0000000..7c9b888 --- /dev/null +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kudu.spark.kudu + +import org.apache.yetus.audience.InterfaceAudience +import org.apache.yetus.audience.InterfaceStability + +import org.apache.kudu.client.ReplicaSelection +import org.apache.kudu.spark.kudu.KuduReadOptions._ + +/** + * KuduReadOptions holds configuration of reads to Kudu tables. + * + * @param batchSize Sets the maximum number of bytes returned by the scanner, on each batch. + * @param scanLocality If true scan locality is enabled, so that the scan will + * take place at the closest replica + * @param faultTolerantScanner scanner type to be used. Fault tolerant if true, + * otherwise, use non fault tolerant one + * @param scanRequestTimeoutMs Maximum time allowed per scan request, in milliseconds + * @param socketReadTimeoutMs Maximum time allowed when waiting on data from a socket + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +case class KuduReadOptions( + batchSize: Int = defaultBatchSize, + scanLocality: ReplicaSelection = defaultScanLocality, + faultTolerantScanner: Boolean = defaultFaultTolerantScanner, + scanRequestTimeoutMs: Option[Long] = None, + socketReadTimeoutMs: Option[Long] = None) + +object KuduReadOptions { + val defaultBatchSize: Int = 1024 * 1024 * 20 // TODO: Understand/doc this setting? + val defaultScanLocality: ReplicaSelection = ReplicaSelection.CLOSEST_REPLICA + val defaultFaultTolerantScanner: Boolean = false +} http://git-wip-us.apache.org/repos/asf/kudu/blob/545afa7a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala index 1eb21ca..22a6886 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala @@ -17,20 +17,25 @@ package org.apache.kudu.spark.kudu +import org.apache.yetus.audience.InterfaceAudience import org.apache.yetus.audience.InterfaceStability +import org.apache.kudu.spark.kudu.KuduWriteOptions._ + /** * KuduWriteOptions holds configuration of writes to Kudu tables. * - * The instance of this class is passed to KuduContext write functions, - * such as insertRows, deleteRows, upsertRows, and updateRows. - * * @param ignoreDuplicateRowErrors when inserting, ignore any new rows that * have a primary key conflict with existing rows * @param ignoreNull update only non-Null columns if set true */ -@InterfaceStability.Unstable -class KuduWriteOptions( - var ignoreDuplicateRowErrors: Boolean = false, - var ignoreNull: Boolean = false) - extends Serializable +@InterfaceAudience.Public +@InterfaceStability.Evolving +case class KuduWriteOptions( + ignoreDuplicateRowErrors: Boolean = defaultIgnoreDuplicateRowErrors, + ignoreNull: Boolean = defaultIgnoreNull) + +object KuduWriteOptions { + val defaultIgnoreDuplicateRowErrors: Boolean = false + val defaultIgnoreNull: Boolean = false +} http://git-wip-us.apache.org/repos/asf/kudu/blob/545afa7a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala index 69b6ca4..aae386f 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala @@ -2,14 +2,15 @@ package org.apache.kudu.spark.kudu import java.util +import org.apache.spark.sql.types._ +import org.apache.yetus.audience.InterfaceAudience +import org.apache.yetus.audience.InterfaceStability + import org.apache.kudu.ColumnTypeAttributes.ColumnTypeAttributesBuilder import org.apache.kudu.ColumnSchema import org.apache.kudu.ColumnTypeAttributes import org.apache.kudu.Schema import org.apache.kudu.Type -import org.apache.spark.sql.types._ -import org.apache.yetus.audience.InterfaceAudience -import org.apache.yetus.audience.InterfaceStability import scala.collection.JavaConverters._ http://git-wip-us.apache.org/repos/asf/kudu/blob/545afa7a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala index ed56b69..624257c 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala @@ -164,8 +164,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { // change the c2 string to abc and insert val updateDF = baseDF.withColumn("c2_s", lit("abc")) - val kuduWriteOptions = new KuduWriteOptions - kuduWriteOptions.ignoreDuplicateRowErrors = true + val kuduWriteOptions = KuduWriteOptions(ignoreDuplicateRowErrors = true) kuduContext.insertRows(updateDF, tableName, kuduWriteOptions) // change the key and insert @@ -320,14 +319,13 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { val nullDF = sqlContext .createDataFrame(Seq((0, null.asInstanceOf[String]))) .toDF("key", "val") - val kuduWriteOptions = new KuduWriteOptions - kuduWriteOptions.ignoreNull = true - kuduContext.upsertRows(nullDF, simpleTableName, kuduWriteOptions) + val ignoreNullOptions = KuduWriteOptions(ignoreNull = true) + kuduContext.upsertRows(nullDF, simpleTableName, ignoreNullOptions) assert(dataDF.collect.toList === nonNullDF.collect.toList) - kuduWriteOptions.ignoreNull = false + val respectNullOptions = KuduWriteOptions(ignoreNull = false) kuduContext.updateRows(nonNullDF, simpleTableName) - kuduContext.upsertRows(nullDF, simpleTableName, kuduWriteOptions) + kuduContext.upsertRows(nullDF, simpleTableName, respectNullOptions) assert(dataDF.collect.toList === nullDF.collect.toList) kuduContext.updateRows(nonNullDF, simpleTableName) @@ -883,8 +881,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { df("key") .plus(100)) .withColumn("c2_s", lit("def")) - val kuduWriteOptions = new KuduWriteOptions - kuduWriteOptions.ignoreDuplicateRowErrors = true + val kuduWriteOptions = KuduWriteOptions(ignoreDuplicateRowErrors = true) kuduContext.insertRows(updateDF, tableName, kuduWriteOptions) assert(kuduContext.syncClient.getLastPropagatedTimestamp > prevTimestamp) } @@ -914,7 +911,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { "kudu.scanRequestTimeoutMs" -> "1") val dataFrame = sqlContext.read.options(kuduOptions).kudu val kuduRelation = kuduRelationFromDataFrame(dataFrame) - assert(kuduRelation.scanRequestTimeoutMs == Some(1)) + assert(kuduRelation.readOptions.scanRequestTimeoutMs == Some(1)) } /** @@ -930,6 +927,6 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { "kudu.socketReadTimeoutMs" -> "1") val dataFrame = sqlContext.read.options(kuduOptions).kudu val kuduRelation = kuduRelationFromDataFrame(dataFrame) - assert(kuduRelation.socketReadTimeoutMs == Some(1)) + assert(kuduRelation.readOptions.socketReadTimeoutMs == Some(1)) } }