phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmaho...@apache.org
Subject phoenix git commit: PHOENIX-2196 phoenix-spark should automatically convert DataFrame field names (Randy Gelhausen)
Date Mon, 28 Sep 2015 16:40:32 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 9af1327b1 -> c3907d14a


PHOENIX-2196 phoenix-spark should automatically convert DataFrame field names (Randy Gelhausen)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c3907d14
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c3907d14
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c3907d14

Branch: refs/heads/4.x-HBase-0.98
Commit: c3907d14acdcaa7c48d75cf068c77143fe5ff4f0
Parents: 9af1327
Author: Josh Mahonin <jmahonin@apache.org>
Authored: Sun Aug 23 14:18:55 2015 -0400
Committer: Josh Mahonin <jmahonin@interset.com>
Committed: Mon Sep 28 12:40:24 2015 -0400

----------------------------------------------------------------------
 .../apache/phoenix/spark/PhoenixSparkIT.scala   | 22 ++++++++++++++++++--
 .../phoenix/spark/DataFrameFunctions.scala      | 15 +++++++------
 2 files changed, 29 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/c3907d14/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index 2889464..a4e37e1 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -23,8 +23,8 @@ import org.apache.phoenix.query.BaseTest
 import org.apache.phoenix.schema.{TableNotFoundException, ColumnNotFoundException}
 import org.apache.phoenix.schema.types.PVarchar
 import org.apache.phoenix.util.{SchemaUtil, ColumnInfo}
-import org.apache.spark.sql.{SaveMode, execution, SQLContext}
-import org.apache.spark.sql.types.{LongType, DataType, StringType, StructField}
+import org.apache.spark.sql.{Row, SaveMode, execution, SQLContext}
+import org.apache.spark.sql.types._
 import org.apache.spark.{SparkConf, SparkContext}
 import org.joda.time.DateTime
 import org.scalatest._
@@ -447,4 +447,22 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll
{
     count shouldEqual 1L
 
   }
+
+  test("Ensure DataFrame field normalization (PHOENIX-2196)") {
+    val rdd1 = sc
+      .parallelize(Seq((1L,1L,"One"),(2L,2L,"Two")))
+      .map(p => Row(p._1, p._2, p._3))
+
+    val sqlContext = new SQLContext(sc)
+
+    val schema = StructType(Seq(
+      StructField("id", LongType, nullable = false),
+      StructField("table1_id", LongType, nullable = true),
+      StructField("\"t2col1\"", StringType, nullable = true)
+    ))
+
+    val df = sqlContext.createDataFrame(rdd1, schema)
+
+    df.saveToPhoenix("TABLE2", zkUrl = Some(quorumAddress))
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c3907d14/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
index 5042eaa..9408210 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
@@ -17,6 +17,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.NullWritable
 import org.apache.phoenix.mapreduce.PhoenixOutputFormat
 import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, PhoenixConfigurationUtil}
+import org.apache.phoenix.util.SchemaUtil
 import org.apache.spark.Logging
 import org.apache.spark.sql.DataFrame
 import scala.collection.JavaConversions._
@@ -26,16 +27,18 @@ class DataFrameFunctions(data: DataFrame) extends Logging with Serializable
{
   def saveToPhoenix(tableName: String, conf: Configuration = new Configuration,
                     zkUrl: Option[String] = None): Unit = {
 
+
+    // Retrieve the schema field names and normalize to Phoenix, need to do this outside
of mapPartitions
+    val fieldArray = data.schema.fieldNames.map(x => SchemaUtil.normalizeIdentifier(x))
+
     // Create a configuration object to use for saving
-    @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, data.schema.fieldNames,
zkUrl, Some(conf))
+    @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray,
zkUrl, Some(conf))
 
     // Retrieve the zookeeper URL
     val zkUrlFinal = ConfigurationUtil.getZookeeperURL(outConfig)
 
-     // Retrieve the schema field names, need to do this outside of mapPartitions
-     val fieldArray = data.schema.fieldNames
-     // Map the row objects into PhoenixRecordWritable
-     val phxRDD = data.mapPartitions{ rows =>
+    // Map the row objects into PhoenixRecordWritable
+    val phxRDD = data.mapPartitions{ rows =>
  
        // Create a within-partition config to retrieve the ColumnInfo list
        @transient val partitionConfig = ConfigurationUtil.getOutputConfiguration(tableName,
fieldArray, zkUrlFinal)
@@ -57,4 +60,4 @@ class DataFrameFunctions(data: DataFrame) extends Logging with Serializable
{
       outConfig
     )
   }
-}
\ No newline at end of file
+}


Mime
View raw message