Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2073E17C4D for ; Thu, 21 May 2015 16:58:52 +0000 (UTC) Received: (qmail 61849 invoked by uid 500); 21 May 2015 16:58:52 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 61821 invoked by uid 500); 21 May 2015 16:58:52 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 61812 invoked by uid 99); 21 May 2015 16:58:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 May 2015 16:58:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DD6A3E481D; Thu, 21 May 2015 16:58:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yhuai@apache.org To: commits@spark.apache.org Message-Id: <90b98c8dbcf14c3aa0bfa66da892efe3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-7565] [SQL] fix MapType in JsonRDD Date: Thu, 21 May 2015 16:58:51 +0000 (UTC) Repository: spark Updated Branches: refs/heads/master feb3a9d3f -> a25c1ab8f [SPARK-7565] [SQL] fix MapType in JsonRDD The key of Map in JsonRDD should be converted into UTF8String (also failed records), Thanks to yhuai viirya Closes #6084 Author: Davies Liu Closes #6299 from davies/string_in_json and squashes the following commits: 0dbf559 [Davies Liu] improve test, fix corrupt record 6836a80 [Davies Liu] move unit tests into Scala b97af11 [Davies Liu] fix MapType in JsonRDD Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a25c1ab8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a25c1ab8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a25c1ab8 Branch: refs/heads/master Commit: a25c1ab8f04a4e19d82ff4c18a0b1689d8b3ddac Parents: feb3a9d Author: Davies Liu Authored: Thu May 21 09:58:47 2015 -0700 Committer: Yin Huai Committed: Thu May 21 09:58:47 2015 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/json/JacksonParser.scala | 8 +++--- .../org/apache/spark/sql/json/JsonRDD.scala | 16 ++++++----- .../org/apache/spark/sql/json/JsonSuite.scala | 28 +++++++++++++++++++- 3 files changed, 41 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a25c1ab8/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala index 8161151..0e22375 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala @@ -150,10 +150,10 @@ private[sql] object JacksonParser { private def convertMap( factory: JsonFactory, parser: JsonParser, - valueType: DataType): Map[String, Any] = { - val builder = Map.newBuilder[String, Any] + valueType: DataType): Map[UTF8String, Any] = { + val builder = Map.newBuilder[UTF8String, Any] while (nextUntil(parser, JsonToken.END_OBJECT)) { - builder += parser.getCurrentName -> convertField(factory, parser, valueType) + builder += UTF8String(parser.getCurrentName) -> convertField(factory, parser, valueType) } builder.result() @@ -181,7 +181,7 @@ private[sql] object JacksonParser { val row = new GenericMutableRow(schema.length) for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) { require(schema(corruptIndex).dataType == StringType) - row.update(corruptIndex, record) + row.update(corruptIndex, UTF8String(record)) } Seq(row) http://git-wip-us.apache.org/repos/asf/spark/blob/a25c1ab8/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index 4c32710..037a6d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -20,18 +20,18 @@ package org.apache.spark.sql.json import java.sql.Timestamp import scala.collection.Map -import scala.collection.convert.Wrappers.{JMapWrapper, JListWrapper} +import scala.collection.convert.Wrappers.{JListWrapper, JMapWrapper} -import com.fasterxml.jackson.core.{JsonGenerator, JsonProcessingException} +import com.fasterxml.jackson.core.JsonProcessingException import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.spark.Logging import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.types._ -import org.apache.spark.Logging private[sql] object JsonRDD extends Logging { @@ -318,7 +318,8 @@ private[sql] object JsonRDD extends Logging { parsed } catch { - case e: JsonProcessingException => Map(columnNameOfCorruptRecords -> record) :: Nil + case e: JsonProcessingException => + Map(columnNameOfCorruptRecords -> UTF8String(record)) :: Nil } } }) @@ -422,7 +423,10 @@ private[sql] object JsonRDD extends Logging { value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType)) case MapType(StringType, valueType, _) => val map = value.asInstanceOf[Map[String, Any]] - map.mapValues(enforceCorrectType(_, valueType)).map(identity) + map.map { + case (k, v) => + (UTF8String(k), enforceCorrectType(v, valueType)) + }.map(identity) case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], struct) case DateType => toDate(value) case TimestampType => toTimestamp(value) http://git-wip-us.apache.org/repos/asf/spark/blob/a25c1ab8/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala index 6f747e5..7e6eeba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala @@ -25,7 +25,6 @@ import org.scalactic.Tolerance._ import org.apache.spark.sql.TestData._ import org.apache.spark.sql.catalyst.util.DateUtils -import org.apache.spark.sql.functions._ import org.apache.spark.sql.json.InferSchema.compatibleType import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.test.TestSQLContext @@ -1074,4 +1073,31 @@ class JsonSuite extends QueryTest { assert(StructType(Seq()) === emptySchema) } + test("SPARK-7565 MapType in JsonRDD") { + val useStreaming = getConf(SQLConf.USE_JACKSON_STREAMING_API, "true") + val oldColumnNameOfCorruptRecord = TestSQLContext.conf.columnNameOfCorruptRecord + TestSQLContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed") + + val schemaWithSimpleMap = StructType( + StructField("map", MapType(StringType, IntegerType, true), false) :: Nil) + try{ + for (useStreaming <- List("true", "false")) { + setConf(SQLConf.USE_JACKSON_STREAMING_API, useStreaming) + val temp = Utils.createTempDir().getPath + + val df = read.schema(schemaWithSimpleMap).json(mapType1) + df.write.mode("overwrite").parquet(temp) + // order of MapType is not defined + assert(read.parquet(temp).count() == 5) + + val df2 = read.json(corruptRecords) + df2.write.mode("overwrite").parquet(temp) + checkAnswer(read.parquet(temp), df2.collect()) + } + } finally { + setConf(SQLConf.USE_JACKSON_STREAMING_API, useStreaming) + setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, oldColumnNameOfCorruptRecord) + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org