Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 31A29200CB0 for ; Fri, 23 Jun 2017 18:28:19 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 30E57160BD4; Fri, 23 Jun 2017 16:28:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 770C7160BCA for ; Fri, 23 Jun 2017 18:28:18 +0200 (CEST) Received: (qmail 77441 invoked by uid 500); 23 Jun 2017 16:28:17 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 77431 invoked by uid 99); 23 Jun 2017 16:28:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Jun 2017 16:28:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8E21ADFC25; Fri, 23 Jun 2017 16:28:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lixiao@apache.org To: commits@spark.apache.org Message-Id: <58f4c822afa04247a223a0b3a6ec792e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-21144][SQL] Print a warning if the data schema and partition schema have the duplicate columns Date: Fri, 23 Jun 2017 16:28:17 +0000 (UTC) archived-at: Fri, 23 Jun 2017 16:28:19 -0000 Repository: spark Updated Branches: refs/heads/master 5dca10b8f -> f3dea6079 [SPARK-21144][SQL] Print a warning if the data schema and partition schema have the duplicate columns ## What changes were proposed in this pull request? The current master outputs unexpected results when the data schema and partition schema have the duplicate columns: ``` withTempPath { dir => val basePath = dir.getCanonicalPath spark.range(0, 3).toDF("foo").write.parquet(new Path(basePath, "foo=1").toString) spark.range(0, 3).toDF("foo").write.parquet(new Path(basePath, "foo=a").toString) spark.read.parquet(basePath).show() } +---+ |foo| +---+ | 1| | 1| | a| | a| | 1| | a| +---+ ``` This patch added code to print a warning when the duplication found. ## How was this patch tested? Manually checked. Author: Takeshi Yamamuro Closes #18375 from maropu/SPARK-21144-3. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f3dea607 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f3dea607 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f3dea607 Branch: refs/heads/master Commit: f3dea60793d86212ba1068e88ad89cb3dcf07801 Parents: 5dca10b Author: Takeshi Yamamuro Authored: Fri Jun 23 09:28:02 2017 -0700 Committer: gatorsmile Committed: Fri Jun 23 09:28:02 2017 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/util/SchemaUtils.scala | 53 ++++++++++++++++++++ .../sql/execution/datasources/DataSource.scala | 6 +++ 2 files changed, 59 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f3dea607/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala new file mode 100644 index 0000000..e881685 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util + +import org.apache.spark.internal.Logging + + +/** + * Utils for handling schemas. + * + * TODO: Merge this file with [[org.apache.spark.ml.util.SchemaUtils]]. + */ +private[spark] object SchemaUtils extends Logging { + + /** + * Checks if input column names have duplicate identifiers. Prints a warning message if + * the duplication exists. + * + * @param columnNames column names to check + * @param colType column type name, used in a warning message + * @param caseSensitiveAnalysis whether duplication checks should be case sensitive or not + */ + def checkColumnNameDuplication( + columnNames: Seq[String], colType: String, caseSensitiveAnalysis: Boolean): Unit = { + val names = if (caseSensitiveAnalysis) { + columnNames + } else { + columnNames.map(_.toLowerCase) + } + if (names.distinct.length != names.length) { + val duplicateColumns = names.groupBy(identity).collect { + case (x, ys) if ys.length > 1 => s"`$x`" + } + logWarning(s"Found duplicate column(s) $colType: ${duplicateColumns.mkString(", ")}. " + + "You might need to assign different column names.") + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f3dea607/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 08c78e6..75e5306 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{CalendarIntervalType, StructType} +import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.util.Utils /** @@ -182,6 +183,11 @@ case class DataSource( throw new AnalysisException( s"Unable to infer schema for $format. It must be specified manually.") } + + SchemaUtils.checkColumnNameDuplication( + (dataSchema ++ partitionSchema).map(_.name), "in the data schema and the partition schema", + sparkSession.sessionState.conf.caseSensitiveAnalysis) + (dataSchema, partitionSchema) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org