Return-Path: X-Original-To: apmail-phoenix-commits-archive@minotaur.apache.org Delivered-To: apmail-phoenix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 13B241841F for ; Mon, 28 Sep 2015 16:36:17 +0000 (UTC) Received: (qmail 97288 invoked by uid 500); 28 Sep 2015 16:35:54 -0000 Delivered-To: apmail-phoenix-commits-archive@phoenix.apache.org Received: (qmail 97253 invoked by uid 500); 28 Sep 2015 16:35:54 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 97244 invoked by uid 99); 28 Sep 2015 16:35:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 28 Sep 2015 16:35:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 996A9E027E; Mon, 28 Sep 2015 16:35:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jmahonin@apache.org To: commits@phoenix.apache.org Message-Id: <491f1407c5264bbbb1a54f01aab02be3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: phoenix git commit: PHOENIX-2196 phoenix-spark should automatically convert DataFrame field names (Randy Gelhausen) Date: Mon, 28 Sep 2015 16:35:54 +0000 (UTC) Repository: phoenix Updated Branches: refs/heads/master 5ecd4967f -> eb9fab429 PHOENIX-2196 phoenix-spark should automatically convert DataFrame field names (Randy Gelhausen) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/eb9fab42 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/eb9fab42 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/eb9fab42 Branch: refs/heads/master Commit: eb9fab42911b08f9e817b313f2250f4685ba13a2 Parents: 5ecd496 Author: Josh Mahonin Authored: Sun Aug 23 14:18:55 2015 -0400 Committer: Josh Mahonin Committed: Mon Sep 28 12:35:25 2015 -0400 ---------------------------------------------------------------------- .../apache/phoenix/spark/PhoenixSparkIT.scala | 22 ++++++++++++++++++-- .../phoenix/spark/DataFrameFunctions.scala | 15 +++++++------ 2 files changed, 29 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/eb9fab42/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala index e1c9df4..1a28b60 100644 --- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala +++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala @@ -23,8 +23,8 @@ import org.apache.phoenix.query.BaseTest import org.apache.phoenix.schema.{TableNotFoundException, ColumnNotFoundException} import org.apache.phoenix.schema.types.PVarchar import org.apache.phoenix.util.{SchemaUtil, ColumnInfo} -import org.apache.spark.sql.{SaveMode, execution, SQLContext} -import org.apache.spark.sql.types.{LongType, DataType, StringType, StructField} +import org.apache.spark.sql.{Row, SaveMode, execution, SQLContext} +import org.apache.spark.sql.types._ import org.apache.spark.{SparkConf, SparkContext} import org.joda.time.DateTime import org.scalatest._ @@ -448,4 +448,22 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { count shouldEqual 1L } + + test("Ensure DataFrame field normalization (PHOENIX-2196)") { + val rdd1 = sc + .parallelize(Seq((1L,1L,"One"),(2L,2L,"Two"))) + .map(p => Row(p._1, p._2, p._3)) + + val sqlContext = new SQLContext(sc) + + val schema = StructType(Seq( + StructField("id", LongType, nullable = false), + StructField("table1_id", LongType, nullable = true), + StructField("\"t2col1\"", StringType, nullable = true) + )) + + val df = sqlContext.createDataFrame(rdd1, schema) + + df.saveToPhoenix("TABLE2", zkUrl = Some(quorumAddress)) + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/eb9fab42/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala index 5042eaa..9408210 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala @@ -17,6 +17,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.NullWritable import org.apache.phoenix.mapreduce.PhoenixOutputFormat import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, PhoenixConfigurationUtil} +import org.apache.phoenix.util.SchemaUtil import org.apache.spark.Logging import org.apache.spark.sql.DataFrame import scala.collection.JavaConversions._ @@ -26,16 +27,18 @@ class DataFrameFunctions(data: DataFrame) extends Logging with Serializable { def saveToPhoenix(tableName: String, conf: Configuration = new Configuration, zkUrl: Option[String] = None): Unit = { + + // Retrieve the schema field names and normalize to Phoenix, need to do this outside of mapPartitions + val fieldArray = data.schema.fieldNames.map(x => SchemaUtil.normalizeIdentifier(x)) + // Create a configuration object to use for saving - @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, data.schema.fieldNames, zkUrl, Some(conf)) + @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl, Some(conf)) // Retrieve the zookeeper URL val zkUrlFinal = ConfigurationUtil.getZookeeperURL(outConfig) - // Retrieve the schema field names, need to do this outside of mapPartitions - val fieldArray = data.schema.fieldNames - // Map the row objects into PhoenixRecordWritable - val phxRDD = data.mapPartitions{ rows => + // Map the row objects into PhoenixRecordWritable + val phxRDD = data.mapPartitions{ rows => // Create a within-partition config to retrieve the ColumnInfo list @transient val partitionConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrlFinal) @@ -57,4 +60,4 @@ class DataFrameFunctions(data: DataFrame) extends Logging with Serializable { outConfig ) } -} \ No newline at end of file +}