Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 48D51200BE0 for ; Sat, 3 Dec 2016 06:14:38 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 47989160B24; Sat, 3 Dec 2016 05:14:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 91C0E160AF6 for ; Sat, 3 Dec 2016 06:14:37 +0100 (CET) Received: (qmail 84878 invoked by uid 500); 3 Dec 2016 05:14:36 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 84869 invoked by uid 99); 3 Dec 2016 05:14:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 03 Dec 2016 05:14:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 900DFE04BB; Sat, 3 Dec 2016 05:14:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rxin@apache.org To: commits@spark.apache.org Message-Id: <8086571505774e21a775bfa49ea884e3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-18362][SQL] Use TextFileFormat in implementation of CSVFileFormat Date: Sat, 3 Dec 2016 05:14:36 +0000 (UTC) archived-at: Sat, 03 Dec 2016 05:14:38 -0000 Repository: spark Updated Branches: refs/heads/master c7c726595 -> 7c33b0fd0 [SPARK-18362][SQL] Use TextFileFormat in implementation of CSVFileFormat ## What changes were proposed in this pull request? This patch significantly improves the IO / file listing performance of schema inference in Spark's built-in CSV data source. Previously, this data source used the legacy `SparkContext.hadoopFile` and `SparkContext.hadoopRDD` methods to read files during its schema inference step, causing huge file-listing bottlenecks on the driver. This patch refactors this logic to use Spark SQL's `text` data source to read files during this step. The text data source still performs some unnecessary file listing (since in theory we already have resolved the table prior to schema inference and therefore should be able to scan without performing _any_ extra listing), but that listing is much faster and takes place in parallel. In one production workload operating over tens of thousands of files, this change managed to reduce schema inference time from 7 minutes to 2 minutes. A similar problem also affects the JSON file format and this patch originally fixed that as well, but I've decided to split that change into a separate patch so as not to conflict with changes in another JSON PR. ## How was this patch tested? Existing unit tests, plus manual benchmarking on a production workload. Author: Josh Rosen Closes #15813 from JoshRosen/use-text-data-source-in-csv-and-json. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c33b0fd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c33b0fd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c33b0fd Branch: refs/heads/master Commit: 7c33b0fd050f3d2b08c1cfd7efbff8166832c1af Parents: c7c7265 Author: Josh Rosen Authored: Fri Dec 2 21:14:34 2016 -0800 Committer: Reynold Xin Committed: Fri Dec 2 21:14:34 2016 -0800 ---------------------------------------------------------------------- .../datasources/csv/CSVFileFormat.scala | 60 +++++++++----------- .../execution/datasources/csv/CSVRelation.scala | 4 +- 2 files changed, 28 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7c33b0fd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index a369115..e627f04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -27,10 +27,12 @@ import org.apache.hadoop.mapreduce._ import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.text.TextFileFormat +import org.apache.spark.sql.functions.{length, trim} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration @@ -52,17 +54,21 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { + require(files.nonEmpty, "Cannot infer schema from an empty set of files") val csvOptions = new CSVOptions(options) // TODO: Move filtering. val paths = files.filterNot(_.getPath.getName startsWith "_").map(_.getPath.toString) - val rdd = baseRdd(sparkSession, csvOptions, paths) - val firstLine = findFirstLine(csvOptions, rdd) + val lines: Dataset[String] = readText(sparkSession, csvOptions, paths) + val firstLine: String = findFirstLine(csvOptions, lines) val firstRow = new CsvReader(csvOptions).parseLine(firstLine) val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis val header = makeSafeHeader(firstRow, csvOptions, caseSensitive) - val parsedRdd = tokenRdd(sparkSession, csvOptions, header, paths) + val parsedRdd: RDD[Array[String]] = CSVRelation.univocityTokenizer( + lines, + firstLine = if (csvOptions.headerFlag) firstLine else null, + params = csvOptions) val schema = if (csvOptions.inferSchemaFlag) { CSVInferSchema.infer(parsedRdd, header, csvOptions) } else { @@ -173,51 +179,37 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { } } - private def baseRdd( - sparkSession: SparkSession, - options: CSVOptions, - inputPaths: Seq[String]): RDD[String] = { - readText(sparkSession, options, inputPaths.mkString(",")) - } - - private def tokenRdd( - sparkSession: SparkSession, - options: CSVOptions, - header: Array[String], - inputPaths: Seq[String]): RDD[Array[String]] = { - val rdd = baseRdd(sparkSession, options, inputPaths) - // Make sure firstLine is materialized before sending to executors - val firstLine = if (options.headerFlag) findFirstLine(options, rdd) else null - CSVRelation.univocityTokenizer(rdd, firstLine, options) - } - /** * Returns the first line of the first non-empty file in path */ - private def findFirstLine(options: CSVOptions, rdd: RDD[String]): String = { + private def findFirstLine(options: CSVOptions, lines: Dataset[String]): String = { + import lines.sqlContext.implicits._ + val nonEmptyLines = lines.filter(length(trim($"value")) > 0) if (options.isCommentSet) { - val comment = options.comment.toString - rdd.filter { line => - line.trim.nonEmpty && !line.startsWith(comment) - }.first() + nonEmptyLines.filter(!$"value".startsWith(options.comment.toString)).first() } else { - rdd.filter { line => - line.trim.nonEmpty - }.first() + nonEmptyLines.first() } } private def readText( sparkSession: SparkSession, options: CSVOptions, - location: String): RDD[String] = { + inputPaths: Seq[String]): Dataset[String] = { if (Charset.forName(options.charset) == StandardCharsets.UTF_8) { - sparkSession.sparkContext.textFile(location) + sparkSession.baseRelationToDataFrame( + DataSource.apply( + sparkSession, + paths = inputPaths, + className = classOf[TextFileFormat].getName + ).resolveRelation(checkFilesExist = false)) + .select("value").as[String](Encoders.STRING) } else { val charset = options.charset - sparkSession.sparkContext - .hadoopFile[LongWritable, Text, TextInputFormat](location) + val rdd = sparkSession.sparkContext + .hadoopFile[LongWritable, Text, TextInputFormat](inputPaths.mkString(",")) .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset))) + sparkSession.createDataset(rdd)(Encoders.STRING) } } http://git-wip-us.apache.org/repos/asf/spark/blob/7c33b0fd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index 52de11d..e4ce7a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -34,12 +34,12 @@ import org.apache.spark.sql.types._ object CSVRelation extends Logging { def univocityTokenizer( - file: RDD[String], + file: Dataset[String], firstLine: String, params: CSVOptions): RDD[Array[String]] = { // If header is set, make sure firstLine is materialized before sending to executors. val commentPrefix = params.comment.toString - file.mapPartitions { iter => + file.rdd.mapPartitions { iter => val parser = new CsvReader(params) val filteredIter = iter.filter { line => line.trim.nonEmpty && !line.startsWith(commentPrefix) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org