kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From granthe...@apache.org
Subject kudu git commit: [java] Move some methods to a SparkUtil class
Date Wed, 06 Jun 2018 18:06:25 GMT
Repository: kudu
Updated Branches:
  refs/heads/master d6bdcff48 -> e82dc1e2a


[java] Move some methods to a SparkUtil class

Moves some utility methods to a SparkUtil class
that is marked as private. These methods are useful
in the backup application work and adding them here
minimizes duplication.

Change-Id: Ib78ec895b25220668757432de287f6d0e5ed5ccc
Reviewed-on: http://gerrit.cloudera.org:8080/10572
Reviewed-by: Hao Hao <hao.hao@cloudera.com>
Tested-by: Grant Henke <granthenke@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e82dc1e2
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e82dc1e2
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e82dc1e2

Branch: refs/heads/master
Commit: e82dc1e2a66c1cd84eda7395911c8e4aa8ada615
Parents: d6bdcff
Author: Grant Henke <granthenke@apache.org>
Authored: Fri Jun 1 10:21:00 2018 -0500
Committer: Grant Henke <granthenke@apache.org>
Committed: Wed Jun 6 15:24:00 2018 +0000

----------------------------------------------------------------------
 .../apache/kudu/spark/kudu/DefaultSource.scala  |  39 +-----
 .../apache/kudu/spark/kudu/KuduContext.scala    |  49 +-------
 .../org/apache/kudu/spark/kudu/SparkUtil.scala  | 123 +++++++++++++++++++
 3 files changed, 131 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e82dc1e2/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index acfebcc..18cdd07 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -23,16 +23,14 @@ import java.sql.Timestamp
 
 import scala.collection.JavaConverters._
 import scala.util.Try
-
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
 import org.apache.yetus.audience.InterfaceStability
-
 import org.apache.kudu.client.KuduPredicate.ComparisonOp
 import org.apache.kudu.client._
-import org.apache.kudu.{ColumnSchema, ColumnTypeAttributes, Type}
+import org.apache.kudu.spark.kudu.SparkUtil._
 
 /**
   * Data source for integration with Spark's [[DataFrame]] API.
@@ -181,19 +179,7 @@ class KuduRelation(private val tableName: String,
     * @return schema generated from the Kudu table's schema
     */
   override def schema: StructType = {
-    userSchema match {
-      case Some(x) =>
-        StructType(x.fields.map(uf => table.getSchema.getColumn(uf.name))
-          .map(kuduColumnToSparkField))
-      case None =>
-        StructType(table.getSchema.getColumns.asScala.map(kuduColumnToSparkField).toArray)
-    }
-  }
-
-  def kuduColumnToSparkField: (ColumnSchema) => StructField = {
-    columnSchema =>
-      val sparkType = kuduTypeToSparkType(columnSchema.getType, columnSchema.getTypeAttributes)
-      new StructField(columnSchema.getName, sparkType, columnSchema.isNullable)
+    sparkSchema(table.getSchema, userSchema.map(_.fieldNames))
   }
 
   /**
@@ -334,27 +320,6 @@ class KuduRelation(private val tableName: String,
 
 private[spark] object KuduRelation {
   /**
-    * Converts a Kudu [[Type]] to a Spark SQL [[DataType]].
-    *
-    * @param t the Kudu type
-    * @param a the Kudu type attributes
-    * @return the corresponding Spark SQL type
-    */
-  private def kuduTypeToSparkType(t: Type, a: ColumnTypeAttributes): DataType = t match {
-    case Type.BOOL => BooleanType
-    case Type.INT8 => ByteType
-    case Type.INT16 => ShortType
-    case Type.INT32 => IntegerType
-    case Type.INT64 => LongType
-    case Type.UNIXTIME_MICROS => TimestampType
-    case Type.FLOAT => FloatType
-    case Type.DOUBLE => DoubleType
-    case Type.STRING => StringType
-    case Type.BINARY => BinaryType
-    case Type.DECIMAL => DecimalType(a.getPrecision, a.getScale)
-  }
-
-  /**
     * Returns `true` if the filter is able to be pushed down to Kudu.
     *
     * @param filter the filter to test

http://git-wip-us.apache.org/repos/asf/kudu/blob/e82dc1e2/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index 867cf19..af56606 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -18,7 +18,6 @@
 package org.apache.kudu.spark.kudu
 
 import java.security.{AccessController, PrivilegedAction}
-import java.util
 import javax.security.auth.Subject
 import javax.security.auth.login.{AppConfigurationEntry, Configuration, LoginContext}
 
@@ -26,10 +25,9 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.hadoop.util.ShutdownHookManager
-import org.apache.kudu.ColumnTypeAttributes.ColumnTypeAttributesBuilder
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, StructField, StructType}
+import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, StructType}
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.util.AccumulatorV2
 import org.apache.yetus.audience.InterfaceStability
@@ -38,7 +36,8 @@ import org.slf4j.{Logger, LoggerFactory}
 import org.apache.kudu.client.SessionConfiguration.FlushMode
 import org.apache.kudu.client._
 import org.apache.kudu.spark.kudu
-import org.apache.kudu.{ColumnSchema, Schema, Type}
+import org.apache.kudu.spark.kudu.SparkUtil._
+import org.apache.kudu.{Schema, Type}
 
 /**
   * KuduContext is a serializable container for Kudu client connections.
@@ -186,48 +185,12 @@ class KuduContext(val kuduMaster: String,
     * @return the Kudu schema
     */
   def createSchema(schema: StructType, keys: Seq[String]): Schema = {
-    val kuduCols = new util.ArrayList[ColumnSchema]()
-    // add the key columns first, in the order specified
-    for (key <- keys) {
-      val field = schema.fields(schema.fieldIndex(key))
-      val col = createColumn(field, isKey = true)
-      kuduCols.add(col)
-    }
-    // now add the non-key columns
-    for (field <- schema.fields.filter(field => !keys.contains(field.name))) {
-      val col = createColumn(field, isKey = false)
-      kuduCols.add(col)
-    }
-    new Schema(kuduCols)
-  }
-
-  private def createColumn(field: StructField, isKey: Boolean): ColumnSchema = {
-    val kt = kuduType(field.dataType)
-    val col = new ColumnSchema.ColumnSchemaBuilder(field.name, kt).key(isKey).nullable(field.nullable)
-    // Add ColumnTypeAttributesBuilder to DECIMAL columns
-    if (kt == Type.DECIMAL) {
-      val dt = field.dataType.asInstanceOf[DecimalType]
-      col.typeAttributes(
-        new ColumnTypeAttributesBuilder().precision(dt.precision).scale(dt.scale).build()
-      )
-    }
-    col.build()
+    kuduSchema(schema, keys)
   }
 
   /** Map Spark SQL type to Kudu type */
-  def kuduType(dt: DataType) : Type = dt match {
-    case DataTypes.BinaryType => Type.BINARY
-    case DataTypes.BooleanType => Type.BOOL
-    case DataTypes.StringType => Type.STRING
-    case DataTypes.TimestampType => Type.UNIXTIME_MICROS
-    case DataTypes.ByteType => Type.INT8
-    case DataTypes.ShortType => Type.INT16
-    case DataTypes.IntegerType => Type.INT32
-    case DataTypes.LongType => Type.INT64
-    case DataTypes.FloatType => Type.FLOAT
-    case DataTypes.DoubleType => Type.DOUBLE
-    case DecimalType() => Type.DECIMAL
-    case _ => throw new IllegalArgumentException(s"No support for Spark SQL type $dt")
+  def kuduType(dt: DataType) : Type = {
+    sparkTypeToKuduType(dt)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/e82dc1e2/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala
new file mode 100644
index 0000000..b9b34f1
--- /dev/null
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala
@@ -0,0 +1,123 @@
+package org.apache.kudu.spark.kudu
+
+import java.util
+
+import org.apache.kudu.ColumnTypeAttributes.ColumnTypeAttributesBuilder
+import org.apache.kudu.{ColumnSchema, ColumnTypeAttributes, Schema, Type}
+import org.apache.spark.sql.types._
+import org.apache.yetus.audience.{InterfaceAudience, InterfaceStability}
+
+import scala.collection.JavaConverters._
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+object SparkUtil {
+
+  /**
+    * Converts a Kudu [[Type]] to a Spark SQL [[DataType]].
+    *
+    * @param t the Kudu type
+    * @param a the Kudu type attributes
+    * @return the corresponding Spark SQL type
+    */
+  def kuduTypeToSparkType(t: Type, a: ColumnTypeAttributes): DataType = t match {
+    case Type.BOOL => BooleanType
+    case Type.INT8 => ByteType
+    case Type.INT16 => ShortType
+    case Type.INT32 => IntegerType
+    case Type.INT64 => LongType
+    case Type.UNIXTIME_MICROS => TimestampType
+    case Type.FLOAT => FloatType
+    case Type.DOUBLE => DoubleType
+    case Type.STRING => StringType
+    case Type.BINARY => BinaryType
+    case Type.DECIMAL => DecimalType(a.getPrecision, a.getScale)
+    case _ => throw new IllegalArgumentException(s"No support for Kudu type $t")
+  }
+
+  /**
+    * Converts a Spark SQL [[DataType]] to a Kudu [[Type]].
+    *
+    * @param dt the Spark SQL type
+    * @return
+    */
+  def sparkTypeToKuduType(dt: DataType) : Type = dt match {
+    case DataTypes.BinaryType => Type.BINARY
+    case DataTypes.BooleanType => Type.BOOL
+    case DataTypes.StringType => Type.STRING
+    case DataTypes.TimestampType => Type.UNIXTIME_MICROS
+    case DataTypes.ByteType => Type.INT8
+    case DataTypes.ShortType => Type.INT16
+    case DataTypes.IntegerType => Type.INT32
+    case DataTypes.LongType => Type.INT64
+    case DataTypes.FloatType => Type.FLOAT
+    case DataTypes.DoubleType => Type.DOUBLE
+    case DecimalType() => Type.DECIMAL
+    case _ => throw new IllegalArgumentException(s"No support for Spark SQL type $dt")
+  }
+
+  /**
+    * Generates a SparkSQL schema from a Kudu schema.
+    *
+    * @param kuduSchema the Kudu schema
+    * @param fields an optional column projection
+    * @return the SparkSQL schema
+    */
+  def sparkSchema(kuduSchema: Schema, fields: Option[Seq[String]] = None): StructType = {
+    val kuduColumns = fields match {
+      case Some(fieldNames) => fieldNames.map(kuduSchema.getColumn)
+      case None => kuduSchema.getColumns.asScala
+    }
+    val sparkColumns = kuduColumns.map { col =>
+      val sparkType = kuduTypeToSparkType(col.getType, col.getTypeAttributes)
+      StructField(col.getName, sparkType, col.isNullable)
+    }
+    StructType(sparkColumns)
+  }
+
+  /**
+    * Generates a Kudu schema from a SparkSQL schema.
+    *
+    * @param sparkSchema the SparkSQL schema
+    * @param keys the ordered names of key columns
+    * @return the Kudu schema
+    */
+  def kuduSchema(sparkSchema: StructType, keys: Seq[String]): Schema = {
+    val kuduCols = new util.ArrayList[ColumnSchema]()
+    // add the key columns first, in the order specified
+    for (key <- keys) {
+      val field = sparkSchema.fields(sparkSchema.fieldIndex(key))
+      val col = createColumnSchema(field, isKey = true)
+      kuduCols.add(col)
+    }
+    // now add the non-key columns
+    for (field <- sparkSchema.fields.filter(field => !keys.contains(field.name))) {
+      val col = createColumnSchema(field, isKey = false)
+      kuduCols.add(col)
+    }
+    new Schema(kuduCols)
+  }
+
+  /**
+    * Generates a Kudu column schema from a SparkSQL field.
+    *
+    * @param field the SparkSQL field
+    * @param isKey true if the column is a key
+    * @return the Kudu column schema
+    */
+  private def createColumnSchema(field: StructField, isKey: Boolean): ColumnSchema = {
+    val kt = sparkTypeToKuduType(field.dataType)
+    val col = new ColumnSchema.ColumnSchemaBuilder(field.name, kt)
+      .key(isKey)
+      .nullable(field.nullable)
+    // Add ColumnTypeAttributesBuilder to DECIMAL columns
+    if (kt == Type.DECIMAL) {
+      val dt = field.dataType.asInstanceOf[DecimalType]
+      col.typeAttributes(
+        new ColumnTypeAttributesBuilder().precision(dt.precision).scale(dt.scale).build()
+      )
+    }
+    col.build()
+  }
+
+}


Mime
View raw message