Repository: kudu
Updated Branches:
refs/heads/master d6bdcff48 -> e82dc1e2a
[java] Move some methods to a SparkUtil class
Moves some utility methods to a SparkUtil class
that is marked as private. These methods are useful
in the backup application work and adding them here
minimizes duplication.
Change-Id: Ib78ec895b25220668757432de287f6d0e5ed5ccc
Reviewed-on: http://gerrit.cloudera.org:8080/10572
Reviewed-by: Hao Hao <hao.hao@cloudera.com>
Tested-by: Grant Henke <granthenke@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e82dc1e2
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e82dc1e2
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e82dc1e2
Branch: refs/heads/master
Commit: e82dc1e2a66c1cd84eda7395911c8e4aa8ada615
Parents: d6bdcff
Author: Grant Henke <granthenke@apache.org>
Authored: Fri Jun 1 10:21:00 2018 -0500
Committer: Grant Henke <granthenke@apache.org>
Committed: Wed Jun 6 15:24:00 2018 +0000
----------------------------------------------------------------------
.../apache/kudu/spark/kudu/DefaultSource.scala | 39 +-----
.../apache/kudu/spark/kudu/KuduContext.scala | 49 +-------
.../org/apache/kudu/spark/kudu/SparkUtil.scala | 123 +++++++++++++++++++
3 files changed, 131 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/e82dc1e2/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index acfebcc..18cdd07 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -23,16 +23,14 @@ import java.sql.Timestamp
import scala.collection.JavaConverters._
import scala.util.Try
-
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
import org.apache.yetus.audience.InterfaceStability
-
import org.apache.kudu.client.KuduPredicate.ComparisonOp
import org.apache.kudu.client._
-import org.apache.kudu.{ColumnSchema, ColumnTypeAttributes, Type}
+import org.apache.kudu.spark.kudu.SparkUtil._
/**
* Data source for integration with Spark's [[DataFrame]] API.
@@ -181,19 +179,7 @@ class KuduRelation(private val tableName: String,
* @return schema generated from the Kudu table's schema
*/
override def schema: StructType = {
- userSchema match {
- case Some(x) =>
- StructType(x.fields.map(uf => table.getSchema.getColumn(uf.name))
- .map(kuduColumnToSparkField))
- case None =>
- StructType(table.getSchema.getColumns.asScala.map(kuduColumnToSparkField).toArray)
- }
- }
-
- def kuduColumnToSparkField: (ColumnSchema) => StructField = {
- columnSchema =>
- val sparkType = kuduTypeToSparkType(columnSchema.getType, columnSchema.getTypeAttributes)
- new StructField(columnSchema.getName, sparkType, columnSchema.isNullable)
+ sparkSchema(table.getSchema, userSchema.map(_.fieldNames))
}
/**
@@ -334,27 +320,6 @@ class KuduRelation(private val tableName: String,
private[spark] object KuduRelation {
/**
- * Converts a Kudu [[Type]] to a Spark SQL [[DataType]].
- *
- * @param t the Kudu type
- * @param a the Kudu type attributes
- * @return the corresponding Spark SQL type
- */
- private def kuduTypeToSparkType(t: Type, a: ColumnTypeAttributes): DataType = t match {
- case Type.BOOL => BooleanType
- case Type.INT8 => ByteType
- case Type.INT16 => ShortType
- case Type.INT32 => IntegerType
- case Type.INT64 => LongType
- case Type.UNIXTIME_MICROS => TimestampType
- case Type.FLOAT => FloatType
- case Type.DOUBLE => DoubleType
- case Type.STRING => StringType
- case Type.BINARY => BinaryType
- case Type.DECIMAL => DecimalType(a.getPrecision, a.getScale)
- }
-
- /**
* Returns `true` if the filter is able to be pushed down to Kudu.
*
* @param filter the filter to test
http://git-wip-us.apache.org/repos/asf/kudu/blob/e82dc1e2/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index 867cf19..af56606 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -18,7 +18,6 @@
package org.apache.kudu.spark.kudu
import java.security.{AccessController, PrivilegedAction}
-import java.util
import javax.security.auth.Subject
import javax.security.auth.login.{AppConfigurationEntry, Configuration, LoginContext}
@@ -26,10 +25,9 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.hadoop.util.ShutdownHookManager
-import org.apache.kudu.ColumnTypeAttributes.ColumnTypeAttributesBuilder
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, StructField, StructType}
+import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, StructType}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.util.AccumulatorV2
import org.apache.yetus.audience.InterfaceStability
@@ -38,7 +36,8 @@ import org.slf4j.{Logger, LoggerFactory}
import org.apache.kudu.client.SessionConfiguration.FlushMode
import org.apache.kudu.client._
import org.apache.kudu.spark.kudu
-import org.apache.kudu.{ColumnSchema, Schema, Type}
+import org.apache.kudu.spark.kudu.SparkUtil._
+import org.apache.kudu.{Schema, Type}
/**
* KuduContext is a serializable container for Kudu client connections.
@@ -186,48 +185,12 @@ class KuduContext(val kuduMaster: String,
* @return the Kudu schema
*/
def createSchema(schema: StructType, keys: Seq[String]): Schema = {
- val kuduCols = new util.ArrayList[ColumnSchema]()
- // add the key columns first, in the order specified
- for (key <- keys) {
- val field = schema.fields(schema.fieldIndex(key))
- val col = createColumn(field, isKey = true)
- kuduCols.add(col)
- }
- // now add the non-key columns
- for (field <- schema.fields.filter(field => !keys.contains(field.name))) {
- val col = createColumn(field, isKey = false)
- kuduCols.add(col)
- }
- new Schema(kuduCols)
- }
-
- private def createColumn(field: StructField, isKey: Boolean): ColumnSchema = {
- val kt = kuduType(field.dataType)
- val col = new ColumnSchema.ColumnSchemaBuilder(field.name, kt).key(isKey).nullable(field.nullable)
- // Add ColumnTypeAttributesBuilder to DECIMAL columns
- if (kt == Type.DECIMAL) {
- val dt = field.dataType.asInstanceOf[DecimalType]
- col.typeAttributes(
- new ColumnTypeAttributesBuilder().precision(dt.precision).scale(dt.scale).build()
- )
- }
- col.build()
+ kuduSchema(schema, keys)
}
/** Map Spark SQL type to Kudu type */
- def kuduType(dt: DataType) : Type = dt match {
- case DataTypes.BinaryType => Type.BINARY
- case DataTypes.BooleanType => Type.BOOL
- case DataTypes.StringType => Type.STRING
- case DataTypes.TimestampType => Type.UNIXTIME_MICROS
- case DataTypes.ByteType => Type.INT8
- case DataTypes.ShortType => Type.INT16
- case DataTypes.IntegerType => Type.INT32
- case DataTypes.LongType => Type.INT64
- case DataTypes.FloatType => Type.FLOAT
- case DataTypes.DoubleType => Type.DOUBLE
- case DecimalType() => Type.DECIMAL
- case _ => throw new IllegalArgumentException(s"No support for Spark SQL type $dt")
+ def kuduType(dt: DataType) : Type = {
+ sparkTypeToKuduType(dt)
}
/**
http://git-wip-us.apache.org/repos/asf/kudu/blob/e82dc1e2/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala
new file mode 100644
index 0000000..b9b34f1
--- /dev/null
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala
@@ -0,0 +1,123 @@
+package org.apache.kudu.spark.kudu
+
+import java.util
+
+import org.apache.kudu.ColumnTypeAttributes.ColumnTypeAttributesBuilder
+import org.apache.kudu.{ColumnSchema, ColumnTypeAttributes, Schema, Type}
+import org.apache.spark.sql.types._
+import org.apache.yetus.audience.{InterfaceAudience, InterfaceStability}
+
+import scala.collection.JavaConverters._
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+object SparkUtil {
+
+ /**
+ * Converts a Kudu [[Type]] to a Spark SQL [[DataType]].
+ *
+ * @param t the Kudu type
+ * @param a the Kudu type attributes
+ * @return the corresponding Spark SQL type
+ */
+ def kuduTypeToSparkType(t: Type, a: ColumnTypeAttributes): DataType = t match {
+ case Type.BOOL => BooleanType
+ case Type.INT8 => ByteType
+ case Type.INT16 => ShortType
+ case Type.INT32 => IntegerType
+ case Type.INT64 => LongType
+ case Type.UNIXTIME_MICROS => TimestampType
+ case Type.FLOAT => FloatType
+ case Type.DOUBLE => DoubleType
+ case Type.STRING => StringType
+ case Type.BINARY => BinaryType
+ case Type.DECIMAL => DecimalType(a.getPrecision, a.getScale)
+ case _ => throw new IllegalArgumentException(s"No support for Kudu type $t")
+ }
+
+ /**
+ * Converts a Spark SQL [[DataType]] to a Kudu [[Type]].
+ *
+ * @param dt the Spark SQL type
+ * @return
+ */
+ def sparkTypeToKuduType(dt: DataType) : Type = dt match {
+ case DataTypes.BinaryType => Type.BINARY
+ case DataTypes.BooleanType => Type.BOOL
+ case DataTypes.StringType => Type.STRING
+ case DataTypes.TimestampType => Type.UNIXTIME_MICROS
+ case DataTypes.ByteType => Type.INT8
+ case DataTypes.ShortType => Type.INT16
+ case DataTypes.IntegerType => Type.INT32
+ case DataTypes.LongType => Type.INT64
+ case DataTypes.FloatType => Type.FLOAT
+ case DataTypes.DoubleType => Type.DOUBLE
+ case DecimalType() => Type.DECIMAL
+ case _ => throw new IllegalArgumentException(s"No support for Spark SQL type $dt")
+ }
+
+ /**
+ * Generates a SparkSQL schema from a Kudu schema.
+ *
+ * @param kuduSchema the Kudu schema
+ * @param fields an optional column projection
+ * @return the SparkSQL schema
+ */
+ def sparkSchema(kuduSchema: Schema, fields: Option[Seq[String]] = None): StructType = {
+ val kuduColumns = fields match {
+ case Some(fieldNames) => fieldNames.map(kuduSchema.getColumn)
+ case None => kuduSchema.getColumns.asScala
+ }
+ val sparkColumns = kuduColumns.map { col =>
+ val sparkType = kuduTypeToSparkType(col.getType, col.getTypeAttributes)
+ StructField(col.getName, sparkType, col.isNullable)
+ }
+ StructType(sparkColumns)
+ }
+
+ /**
+ * Generates a Kudu schema from a SparkSQL schema.
+ *
+ * @param sparkSchema the SparkSQL schema
+ * @param keys the ordered names of key columns
+ * @return the Kudu schema
+ */
+ def kuduSchema(sparkSchema: StructType, keys: Seq[String]): Schema = {
+ val kuduCols = new util.ArrayList[ColumnSchema]()
+ // add the key columns first, in the order specified
+ for (key <- keys) {
+ val field = sparkSchema.fields(sparkSchema.fieldIndex(key))
+ val col = createColumnSchema(field, isKey = true)
+ kuduCols.add(col)
+ }
+ // now add the non-key columns
+ for (field <- sparkSchema.fields.filter(field => !keys.contains(field.name))) {
+ val col = createColumnSchema(field, isKey = false)
+ kuduCols.add(col)
+ }
+ new Schema(kuduCols)
+ }
+
+ /**
+ * Generates a Kudu column schema from a SparkSQL field.
+ *
+ * @param field the SparkSQL field
+ * @param isKey true if the column is a key
+ * @return the Kudu column schema
+ */
+ private def createColumnSchema(field: StructField, isKey: Boolean): ColumnSchema = {
+ val kt = sparkTypeToKuduType(field.dataType)
+ val col = new ColumnSchema.ColumnSchemaBuilder(field.name, kt)
+ .key(isKey)
+ .nullable(field.nullable)
+ // Add ColumnTypeAttributesBuilder to DECIMAL columns
+ if (kt == Type.DECIMAL) {
+ val dt = field.dataType.asInstanceOf[DecimalType]
+ col.typeAttributes(
+ new ColumnTypeAttributesBuilder().precision(dt.precision).scale(dt.scale).build()
+ )
+ }
+ col.build()
+ }
+
+}
|