Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 17F4F200B84 for ; Tue, 6 Sep 2016 04:57:35 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 16622160ACC; Tue, 6 Sep 2016 02:57:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5EDB9160ABC for ; Tue, 6 Sep 2016 04:57:34 +0200 (CEST) Received: (qmail 90969 invoked by uid 500); 6 Sep 2016 02:57:33 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 90960 invoked by uid 99); 6 Sep 2016 02:57:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Sep 2016 02:57:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 55C01DFFB9; Tue, 6 Sep 2016 02:57:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wenchen@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-17358][SQL] Cached table(parquet/orc) should be shard between beelines Date: Tue, 6 Sep 2016 02:57:33 +0000 (UTC) archived-at: Tue, 06 Sep 2016 02:57:35 -0000 Repository: spark Updated Branches: refs/heads/master afb3d5d30 -> 64e826f91 [SPARK-17358][SQL] Cached table(parquet/orc) should be shard between beelines ## What changes were proposed in this pull request? Cached table(parquet/orc) couldn't be shard between beelines, because the `sameResult` method used by `CacheManager` always return false(`sparkSession` are different) when compare two `HadoopFsRelation` in different beelines. So we make `sparkSession` a curry parameter. ## How was this patch tested? Beeline1 ``` 1: jdbc:hive2://localhost:10000> CACHE TABLE src_pqt; +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (5.143 seconds) 1: jdbc:hive2://localhost:10000> EXPLAIN SELECT * FROM src_pqt; +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ | plan | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ | == Physical Plan == InMemoryTableScan [key#49, value#50] +- InMemoryRelation [key#49, value#50], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `src_pqt` +- *FileScan parquet default.src_pqt[key#0,value#1] Batched: true, Format: ParquetFormat, InputPaths: hdfs://199.0.0.1:9000/qiyadong/src_pqt, PartitionFilters: [], PushedFilters: [], ReadSchema: struct | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ ``` Beeline2 ``` 0: jdbc:hive2://localhost:10000> EXPLAIN SELECT * FROM src_pqt; +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ | plan | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ | == Physical Plan == InMemoryTableScan [key#68, value#69] +- InMemoryRelation [key#68, value#69], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `src_pqt` +- *FileScan parquet default.src_pqt[key#0,value#1] Batched: true, Format: ParquetFormat, InputPaths: hdfs://199.0.0.1:9000/qiyadong/src_pqt, PartitionFilters: [], PushedFilters: [], ReadSchema: struct | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+ ``` Author: Yadong Qi Closes #14913 from watermen/SPARK-17358. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64e826f9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64e826f9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64e826f9 Branch: refs/heads/master Commit: 64e826f91eabb1a22d3d163d71fbb7b6d2185f25 Parents: afb3d5d Author: Yadong Qi Authored: Tue Sep 6 10:57:21 2016 +0800 Committer: Wenchen Fan Committed: Tue Sep 6 10:57:21 2016 +0800 ---------------------------------------------------------------------- .../apache/spark/sql/execution/datasources/DataSource.scala | 6 ++---- .../spark/sql/execution/datasources/fileSourceInterfaces.scala | 4 ++-- .../sql/execution/datasources/FileSourceStrategySuite.scala | 3 ++- .../scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 3 +-- 4 files changed, 7 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/64e826f9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5968db8..9c99a80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -351,13 +351,12 @@ case class DataSource( } HadoopFsRelation( - sparkSession, fileCatalog, partitionSchema = fileCatalog.partitionSpec().partitionColumns, dataSchema = dataSchema, bucketSpec = None, format, - options) + options)(sparkSession) // This is a non-streaming file based datasource. case (format: FileFormat, _) => @@ -409,13 +408,12 @@ case class DataSource( } HadoopFsRelation( - sparkSession, fileCatalog, partitionSchema = fileCatalog.partitionSpec().partitionColumns, dataSchema = dataSchema.asNullable, bucketSpec = bucketSpec, format, - caseInsensitiveOptions) + caseInsensitiveOptions)(sparkSession) case _ => throw new AnalysisException( http://git-wip-us.apache.org/repos/asf/spark/blob/64e826f9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index e03a232..7e40c35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -134,13 +134,13 @@ abstract class OutputWriter { * @param options Configuration used when reading / writing data. */ case class HadoopFsRelation( - sparkSession: SparkSession, location: FileCatalog, partitionSchema: StructType, dataSchema: StructType, bucketSpec: Option[BucketSpec], fileFormat: FileFormat, - options: Map[String, String]) extends BaseRelation with FileRelation { + options: Map[String, String])(val sparkSession: SparkSession) + extends BaseRelation with FileRelation { override def sqlContext: SQLContext = sparkSession.sqlContext http://git-wip-us.apache.org/repos/asf/spark/blob/64e826f9/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 09fd750..45411fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -508,7 +508,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi val bucketed = df.queryExecution.analyzed transform { case l @ LogicalRelation(r: HadoopFsRelation, _, _) => l.copy(relation = - r.copy(bucketSpec = Some(BucketSpec(numBuckets = buckets, "c1" :: Nil, Nil)))) + r.copy(bucketSpec = + Some(BucketSpec(numBuckets = buckets, "c1" :: Nil, Nil)))(r.sparkSession)) } Dataset.ofRows(spark, bucketed) } else { http://git-wip-us.apache.org/repos/asf/spark/blob/64e826f9/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index d31a8d6..c48d4ed 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -249,13 +249,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } val relation = HadoopFsRelation( - sparkSession = sparkSession, location = fileCatalog, partitionSchema = partitionSchema, dataSchema = inferredSchema, bucketSpec = bucketSpec, fileFormat = defaultSource, - options = options) + options = options)(sparkSession = sparkSession) val created = LogicalRelation(relation, catalogTable = Some(metastoreRelation.catalogTable)) cachedDataSourceTables.put(tableIdentifier, created) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org