Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 51F3E200BEF for ; Wed, 4 Jan 2017 10:44:36 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 5087A160B3A; Wed, 4 Jan 2017 09:44:36 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 52868160B39 for ; Wed, 4 Jan 2017 10:44:34 +0100 (CET) Received: (qmail 15385 invoked by uid 500); 4 Jan 2017 09:44:33 -0000 Mailing-List: contact commits-help@carbondata.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.incubator.apache.org Delivered-To: mailing list commits@carbondata.incubator.apache.org Received: (qmail 15376 invoked by uid 99); 4 Jan 2017 09:44:33 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Jan 2017 09:44:33 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id E29A5C023E for ; Wed, 4 Jan 2017 09:44:32 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id CM0pD8GIPiM5 for ; Wed, 4 Jan 2017 09:44:24 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 4CA445F47A for ; Wed, 4 Jan 2017 09:44:22 +0000 (UTC) Received: (qmail 15268 invoked by uid 99); 4 Jan 2017 09:44:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Jan 2017 09:44:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 73196DF9E5; Wed, 4 Jan 2017 09:44:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jackylk@apache.org To: commits@carbondata.incubator.apache.org Date: Wed, 04 Jan 2017 09:44:21 -0000 Message-Id: <7b63da22973146babd72902793afde54@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-carbondata git commit: unify csv reader archived-at: Wed, 04 Jan 2017 09:44:36 -0000 Repository: incubator-carbondata Updated Branches: refs/heads/master af956f533 -> 82072ee19 unify csv reader Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/e9093755 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/e9093755 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/e9093755 Branch: refs/heads/master Commit: e90937555ea443bea77c75079e9babb3b703fe0f Parents: af956f5 Author: QiangCai Authored: Tue Jan 3 16:28:06 2017 +0800 Committer: jackylk Committed: Wed Jan 4 17:42:46 2017 +0800 ---------------------------------------------------------------------- integration/spark-common/pom.xml | 5 - .../carbondata/spark/csv/CarbonCsvReader.scala | 182 -------------- .../spark/csv/CarbonCsvRelation.scala | 249 ------------------- .../carbondata/spark/csv/CarbonTextFile.scala | 91 ------- .../carbondata/spark/csv/DefaultSource.scala | 183 -------------- .../spark/rdd/CarbonGlobalDictionaryRDD.scala | 30 +++ .../spark/rdd/NewCarbonDataLoadRDD.scala | 16 +- .../carbondata/spark/util/CommonUtil.scala | 46 ++++ .../spark/util/GlobalDictionaryUtil.scala | 75 +++--- integration/spark/pom.xml | 5 - .../spark/CarbonDataFrameWriter.scala | 40 ++- .../spark/rdd/CarbonDataRDDFactory.scala | 3 +- .../spark/sql/CarbonDatasourceRelation.scala | 2 - .../dataload/DefaultSourceTestCase.scala | 105 -------- .../TestLoadDataWithNotProperInputFile.scala | 1 + .../spark/rdd/CarbonDataRDDFactory.scala | 3 +- pom.xml | 1 - 17 files changed, 158 insertions(+), 879 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark-common/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml index 8934c89..4bb4e0a 100644 --- a/integration/spark-common/pom.xml +++ b/integration/spark-common/pom.xml @@ -75,11 +75,6 @@ junit - com.databricks - spark-csv_${scala.binary.version} - 1.2.0 - - org.scalatest scalatest_${scala.binary.version} 2.2.1 http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala deleted file mode 100644 index 551fc9c..0000000 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.databricks.spark.sql.readers - -/** - * Parser for parsing lines in bulk. Use this when efficiency is desired. - * - * @param iter iterator over lines in the file - * @param fieldSep the delimiter used to separate fields in a line - * @param lineSep the delimiter used to separate lines - * @param quote character used to quote fields - * @param escape character used to escape the quote character - * @param ignoreLeadingSpace ignore white space before a field - * @param ignoreTrailingSpace ignore white space after a field - * @param headers headers for the columns - * @param inputBufSize size of buffer to use for parsing input, tune for performance - * @param maxCols maximum number of columns allowed, for safety against bad inputs - */ -class CarbonBulkCsvReader (iter: Iterator[String], - split: Int, - fieldSep: Char = ',', - lineSep: String = "\n", - quote: Char = '"', - escape: Char = '\\', - commentMarker: Char = '#', - ignoreLeadingSpace: Boolean = true, - ignoreTrailingSpace: Boolean = true, - headers: Seq[String], - inputBufSize: Int = 128, - maxCols: Int = 20480) - extends CsvReader(fieldSep, - lineSep, - quote, - escape, - commentMarker, - ignoreLeadingSpace, - ignoreTrailingSpace, - headers, - inputBufSize, - maxCols) - with Iterator[Array[String]] { - - private val reader = new CarbonStringIteratorReader(iter) - parser.beginParsing(reader) - private var nextRecord = parser.parseNext() - - /** - * get the next parsed line. - * - * @return array of strings where each string is a field in the CSV record - */ - def next: Array[String] = { - val curRecord = nextRecord - if(curRecord != null) { - nextRecord = parser.parseNext() - } else { - throw new NoSuchElementException("next record is null") - } - curRecord - } - - def hasNext: Boolean = nextRecord != null - -} - -/** - * A Reader that "reads" from a sequence of lines. Spark's textFile method removes newlines at - * end of each line Univocity parser requires a Reader that provides access to the data to be - * parsed and needs the newlines to be present - * @param iter iterator over RDD[String] - */ -private class CarbonStringIteratorReader(val iter: Iterator[String]) extends java.io.Reader { - - private var next: Long = 0 - private var length: Long = 0 // length of input so far - private var start: Long = 0 - private var str: String = null // current string from iter - - /** - * fetch next string from iter, if done with current one - * pretend there is a new line at the end of every string we get from from iter - */ - private def refill(): Unit = { - if (length == next) { - if (iter.hasNext) { - str = iter.next - start = length - // add a space to every line except the last one to store '\n' - if (iter.hasNext) { - length += (str.length + 1) // allowance for newline removed by SparkContext.textFile() - } else { - length += str.length - } - } else { - str = null - } - } - } - - /** - * read the next character, if at end of string pretend there is a new line - */ - override def read(): Int = { - refill() - if(next >= length) { - -1 - } else { - val cur = next - start - next += 1 - if (cur == str.length) '\n' else str.charAt(cur.toInt) - } - } - - /** - * read from str into cbuf - */ - def read(cbuf: Array[Char], off: Int, len: Int): Int = { - refill() - var n = 0 - if ((off < 0) || (off > cbuf.length) || (len < 0) || - ((off + len) > cbuf.length) || ((off + len) < 0)) { - throw new IndexOutOfBoundsException() - } else if (len == 0) { - n = 0 - } else { - if (next >= length) { // end of input - n = -1 - } else { - n = Math.min(length - next, len).toInt // lesser of amount of input available or buf size - // add a '\n' to every line except the last one - if (n == length - next && iter.hasNext) { - str.getChars((next - start).toInt, (next - start + n - 1).toInt, cbuf, off) - cbuf(off + n - 1) = '\n' - } else { - str.getChars((next - start).toInt, (next - start + n).toInt, cbuf, off) - } - next += n - if (n < len) { - val m = read(cbuf, off + n, len - n) // have more space, fetch more input from iter - if(m != -1) n += m - } - } - } - n - } - - override def skip(ns: Long): Long = { - throw new IllegalArgumentException("Skip not implemented") - } - - override def ready: Boolean = { - refill() - true - } - - override def markSupported: Boolean = false - - override def mark(readAheadLimit: Int): Unit = { - throw new IllegalArgumentException("Mark not implemented") - } - - override def reset(): Unit = { - throw new IllegalArgumentException("Mark and hence reset not implemented") - } - - def close(): Unit = { } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala deleted file mode 100644 index e751fe8..0000000 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala +++ /dev/null @@ -1,249 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.databricks.spark.csv - -import java.io.IOException - -import scala.collection.JavaConverters._ -import scala.util.control.NonFatal - -import com.databricks.spark.csv.newapi.CarbonTextFile -import com.databricks.spark.csv.util._ -import com.databricks.spark.sql.readers._ -import org.apache.commons.csv._ -import org.apache.commons.lang3.StringUtils -import org.apache.hadoop.fs.Path -import org.apache.spark.rdd.RDD -import org.apache.spark.sql._ -import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation, TableScan} -import org.apache.spark.sql.types._ -import org.slf4j.LoggerFactory - -import org.apache.carbondata.processing.etl.DataLoadingException - -case class CarbonCsvRelation protected[spark] ( - location: String, - useHeader: Boolean, - delimiter: Char, - quote: Char, - escape: Character, - comment: Character, - parseMode: String, - parserLib: String, - ignoreLeadingWhiteSpace: Boolean, - ignoreTrailingWhiteSpace: Boolean, - userSchema: StructType = null, - charset: String = TextFile.DEFAULT_CHARSET.name(), - inferCsvSchema: Boolean)(@transient val sqlContext: SQLContext) - extends BaseRelation with TableScan with InsertableRelation { - - /** - * Limit the number of lines we'll search for a header row that isn't comment-prefixed. - */ - private val MAX_COMMENT_LINES_IN_HEADER = 10 - - private val logger = LoggerFactory.getLogger(CarbonCsvRelation.getClass) - - // Parse mode flags - if (!ParseModes.isValidMode(parseMode)) { - logger.warn(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.") - } - - if((ignoreLeadingWhiteSpace || ignoreLeadingWhiteSpace) && ParserLibs.isCommonsLib(parserLib)) { - logger.warn(s"Ignore white space options may not work with Commons parserLib option") - } - - private val failFast = ParseModes.isFailFastMode(parseMode) - private val dropMalformed = ParseModes.isDropMalformedMode(parseMode) - private val permissive = ParseModes.isPermissiveMode(parseMode) - - val schema = inferSchema() - - def tokenRdd(header: Array[String]): RDD[Array[String]] = { - - val baseRDD = CarbonTextFile.withCharset(sqlContext.sparkContext, location, charset) - - if(ParserLibs.isUnivocityLib(parserLib)) { - univocityParseCSV(baseRDD, header) - } else { - val csvFormat = CSVFormat.DEFAULT - .withDelimiter(delimiter) - .withQuote(quote) - .withEscape(escape) - .withSkipHeaderRecord(false) - .withHeader(header: _*) - .withCommentMarker(comment) - - // If header is set, make sure firstLine is materialized before sending to executors. - val filterLine = if (useHeader) firstLine else null - - baseRDD.mapPartitions { iter => - // When using header, any input line that equals firstLine is assumed to be header - val csvIter = if (useHeader) { - iter.filter(_ != filterLine) - } else { - iter - } - parseCSV(csvIter, csvFormat) - } - } - } - - // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits. - def buildScan: RDD[Row] = { - val schemaFields = schema.fields - tokenRdd(schemaFields.map(_.name)).flatMap{ tokens => - - if (dropMalformed && schemaFields.length != tokens.size) { - logger.warn(s"Dropping malformed line: $tokens") - None - } else if (failFast && schemaFields.length != tokens.size) { - throw new RuntimeException(s"Malformed line in FAILFAST mode: $tokens") - } else { - var index: Int = 0 - val rowArray = new Array[Any](schemaFields.length) - try { - index = 0 - while (index < schemaFields.length) { - val field = schemaFields(index) - rowArray(index) = TypeCast.castTo(tokens(index), field.dataType, field.nullable) - index = index + 1 - } - Some(Row.fromSeq(rowArray)) - } catch { - case aiob: ArrayIndexOutOfBoundsException if permissive => - (index until schemaFields.length).foreach(ind => rowArray(ind) = null) - Some(Row.fromSeq(rowArray)) - } - } - } - } - - private def inferSchema(): StructType = { - if (this.userSchema != null) { - userSchema - } else { - val firstRow = if (ParserLibs.isUnivocityLib(parserLib)) { - val escapeVal = if (escape == null) '\\' else escape.charValue() - val commentChar: Char = if (comment == null) '\0' else comment - new LineCsvReader(fieldSep = delimiter, quote = quote, escape = escapeVal, - commentMarker = commentChar).parseLine(firstLine) - } else { - val csvFormat = CSVFormat.DEFAULT - .withDelimiter(delimiter) - .withQuote(quote) - .withEscape(escape) - .withSkipHeaderRecord(false) - CSVParser.parse(firstLine, csvFormat).getRecords.get(0).asScala.toArray - } - if(null == firstRow) { - throw new DataLoadingException("First line of the csv is not valid.") - } - val header = if (useHeader) { - firstRow - } else { - firstRow.zipWithIndex.map { case (value, index) => s"C$index"} - } - if (this.inferCsvSchema) { - InferSchema(tokenRdd(header), header) - } else { - // By default fields are assumed to be StringType - val schemaFields = header.map { fieldName => - StructField(fieldName.toString, StringType, nullable = true) - } - StructType(schemaFields) - } - } - } - - /** - * Returns the first line of the first non-empty file in path - */ - private lazy val firstLine = { - val csv = CarbonTextFile.withCharset(sqlContext.sparkContext, location, charset) - if (comment == null) { - csv.first() - } else { - csv.take(MAX_COMMENT_LINES_IN_HEADER) - .find(x => !StringUtils.isEmpty(x) && !x.startsWith(comment.toString)) - .getOrElse(sys.error(s"No uncommented header line in " + - s"first $MAX_COMMENT_LINES_IN_HEADER lines")) - } - } - - private def univocityParseCSV( - file: RDD[String], - header: Seq[String]): RDD[Array[String]] = { - // If header is set, make sure firstLine is materialized before sending to executors. - val filterLine = if (useHeader) firstLine else null - val dataLines = if (useHeader) file.filter(_ != filterLine) else file - val rows = dataLines.mapPartitionsWithIndex({ - case (split, iter) => - val escapeVal = if (escape == null) '\\' else escape.charValue() - val commentChar: Char = if (comment == null) '\0' else comment - - new CarbonBulkCsvReader(iter, split, - headers = header, fieldSep = delimiter, - quote = quote, escape = escapeVal, commentMarker = commentChar, - ignoreLeadingSpace = ignoreLeadingWhiteSpace, - ignoreTrailingSpace = ignoreTrailingWhiteSpace) - }, true) - rows - } - - private def parseCSV( - iter: Iterator[String], - csvFormat: CSVFormat): Iterator[Array[String]] = { - iter.flatMap { line => - try { - val records = CSVParser.parse(line, csvFormat).getRecords - if (records.isEmpty) { - logger.warn(s"Ignoring empty line: $line") - None - } else { - Some(records.get(0).asScala.toArray) - } - } catch { - case NonFatal(e) if !failFast => - logger.error(s"Exception while parsing line: $line. ", e) - None - } - } - } - - // The function below was borrowed from JSONRelation - override def insert(data: DataFrame, overwrite: Boolean): Unit = { - val filesystemPath = new Path(location) - val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - - if (overwrite) { - try { - fs.delete(filesystemPath, true) - } catch { - case e: IOException => - throw new IOException( - s"Unable to clear output directory ${filesystemPath.toString} prior" - + s" to INSERT OVERWRITE a CSV table:\n${e.toString}") - } - // Write the data. We assume that schema isn't changed, and we won't update it. - data.saveAsCsvFile(location, Map("delimiter" -> delimiter.toString)) - } else { - sys.error("CSV tables only support INSERT OVERWRITE for now.") - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala deleted file mode 100644 index b5d7542..0000000 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.databricks.spark.csv.newapi - -import java.nio.charset.Charset - -import com.databricks.spark.csv.util.TextFile -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.io.{LongWritable, Text} -import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat} -import org.apache.spark.SparkContext -import org.apache.spark.rdd.{NewHadoopRDD, RDD} -import org.apache.spark.util.FileUtils - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants - -/** - * create RDD use CarbonDataLoadInputFormat - */ -object CarbonTextFile { - - private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - def configSplitMaxSize(context: SparkContext, filePaths: String, - hadoopConfiguration: Configuration): Unit = { - val defaultParallelism = if (context.defaultParallelism < 1) { - 1 - } else { - context.defaultParallelism - } - val spaceConsumed = FileUtils.getSpaceOccupied(filePaths) - val blockSize = - hadoopConfiguration.getLongBytes("dfs.blocksize", CarbonCommonConstants.CARBON_256MB) - LOGGER.info("[Block Distribution]") - // calculate new block size to allow use all the parallelism - if (spaceConsumed < defaultParallelism * blockSize) { - var newSplitSize: Long = spaceConsumed / defaultParallelism - if (newSplitSize < CarbonCommonConstants.CARBON_16MB) { - newSplitSize = CarbonCommonConstants.CARBON_16MB - } - hadoopConfiguration.set(FileInputFormat.SPLIT_MAXSIZE, newSplitSize.toString) - LOGGER.info(s"totalInputSpaceConsumed: $spaceConsumed , " + - s"defaultParallelism: $defaultParallelism") - LOGGER.info(s"mapreduce.input.fileinputformat.split.maxsize: ${ newSplitSize.toString }") - } - } - private def newHadoopRDD(sc: SparkContext, location: String) = { - val hadoopConfiguration = new Configuration(sc.hadoopConfiguration) - hadoopConfiguration.setStrings(FileInputFormat.INPUT_DIR, location) - hadoopConfiguration.setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, true) - hadoopConfiguration.set("io.compression.codecs", - """org.apache.hadoop.io.compress.GzipCodec, - org.apache.hadoop.io.compress.DefaultCodec, - org.apache.hadoop.io.compress.BZip2Codec""".stripMargin) - - configSplitMaxSize(sc, location, hadoopConfiguration) - new NewHadoopRDD[LongWritable, Text]( - sc, - classOf[TextInputFormat], - classOf[LongWritable], - classOf[Text], - hadoopConfiguration).setName("newHadoopRDD-spark-csv") - } - - def withCharset(sc: SparkContext, location: String, charset: String): RDD[String] = { - if (Charset.forName(charset) == TextFile.DEFAULT_CHARSET) { - newHadoopRDD(sc, location).map(pair => pair._2.toString) - } else { - // can't pass a Charset object here cause its not serializable - // TODO: maybe use mapPartitions instead? - newHadoopRDD(sc, location).map( - pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset)) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala deleted file mode 100644 index 89595e5..0000000 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.databricks.spark.csv.newapi - -import com.databricks.spark.csv.{CarbonCsvRelation, CsvSchemaRDD} -import com.databricks.spark.csv.util.{ParserLibs, TextFile, TypeCast} -import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec} -import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} -import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.StructType - -/** - * Provides access to CSV data from pure SQL statements (i.e. for users of the - * JDBC server). - */ -class DefaultSource - extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider { - - private def checkPath(parameters: Map[String, String]): String = { - parameters.getOrElse("path", sys.error("'path' must be specified for CSV data.")) - } - - /** - * Creates a new relation for data store in CSV given parameters. - * Parameters have to include 'path' and optionally 'delimiter', 'quote', and 'header' - */ - override def createRelation(sqlContext: SQLContext, - parameters: Map[String, String]): BaseRelation = { - createRelation(sqlContext, parameters, null) - } - - /** - * Creates a new relation for data store in CSV given parameters and user supported schema. - * Parameters have to include 'path' and optionally 'delimiter', 'quote', and 'header' - */ - override def createRelation( - sqlContext: SQLContext, - parameters: Map[String, String], - schema: StructType): BaseRelation = { - val path = checkPath(parameters) - val delimiter = TypeCast.toChar(parameters.getOrElse("delimiter", ",")) - - val quote = parameters.getOrElse("quote", "\"") - val quoteChar = if (quote.length == 1) { - quote.charAt(0) - } else { - throw new Exception("Quotation cannot be more than one character.") - } - - val escape = parameters.getOrElse("escape", null) - val escapeChar: Character = if (escape == null || (escape.length == 0)) { - null - } else if (escape.length == 1) { - escape.charAt(0) - } else { - throw new Exception("Escape character cannot be more than one character.") - } - - val comment = parameters.getOrElse("comment", "#") - val commentChar: Character = if (comment == null) { - null - } else if (comment.length == 1) { - comment.charAt(0) - } else { - throw new Exception("Comment marker cannot be more than one character.") - } - - val parseMode = parameters.getOrElse("mode", "PERMISSIVE") - - val useHeader = parameters.getOrElse("header", "false") - val headerFlag = if (useHeader == "true") { - true - } else if (useHeader == "false") { - false - } else { - throw new Exception("Header flag can be true or false") - } - - val parserLib = parameters.getOrElse("parserLib", ParserLibs.DEFAULT) - val ignoreLeadingWhiteSpace = parameters.getOrElse("ignoreLeadingWhiteSpace", "false") - val ignoreLeadingWhiteSpaceFlag = if (ignoreLeadingWhiteSpace == "false") { - false - } else if (ignoreLeadingWhiteSpace == "true") { - if (!ParserLibs.isUnivocityLib(parserLib)) { - throw new Exception("Ignore whitesspace supported for Univocity parser only") - } - true - } else { - throw new Exception("Ignore white space flag can be true or false") - } - val ignoreTrailingWhiteSpace = parameters.getOrElse("ignoreTrailingWhiteSpace", "false") - val ignoreTrailingWhiteSpaceFlag = if (ignoreTrailingWhiteSpace == "false") { - false - } else if (ignoreTrailingWhiteSpace == "true") { - if (!ParserLibs.isUnivocityLib(parserLib)) { - throw new Exception("Ignore whitespace supported for the Univocity parser only") - } - true - } else { - throw new Exception("Ignore white space flag can be true or false") - } - - val charset = parameters.getOrElse("charset", TextFile.DEFAULT_CHARSET.name()) - // TODO validate charset? - - val inferSchema = parameters.getOrElse("inferSchema", "false") - val inferSchemaFlag = if (inferSchema == "false") { - false - } else if (inferSchema == "true") { - true - } else { - throw new Exception("Infer schema flag can be true or false") - } - - CarbonCsvRelation(path, - headerFlag, - delimiter, - quoteChar, - escapeChar, - commentChar, - parseMode, - parserLib, - ignoreLeadingWhiteSpaceFlag, - ignoreTrailingWhiteSpaceFlag, - schema, - charset, - inferSchemaFlag)(sqlContext) - } - - override def createRelation( - sqlContext: SQLContext, - mode: SaveMode, - parameters: Map[String, String], - data: DataFrame): BaseRelation = { - val path = checkPath(parameters) - val filesystemPath = new Path(path) - val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - val doSave = if (fs.exists(filesystemPath)) { - mode match { - case SaveMode.Append => - sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}") - case SaveMode.Overwrite => - fs.delete(filesystemPath, true) - true - case SaveMode.ErrorIfExists => - sys.error(s"path $path already exists.") - case SaveMode.Ignore => false - } - } else { - true - } - - val codec: Class[_ <: CompressionCodec] = - parameters.getOrElse("codec", "none") match { - case "gzip" => classOf[GzipCodec] - case _ => null - } - - if (doSave) { - // Only save data when the save mode is not ignore. - data.saveAsCsvFile(path, parameters, codec) - } - - createRelation(sqlContext, parameters, data.schema) - } -} - http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala index 9d9fa99..53ebd41 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala @@ -229,6 +229,36 @@ class CarbonAllDictionaryCombineRDD( } } +class StringArrayRow(var values: Array[String]) extends Row { + + override def length: Int = values.length + + override def get(i: Int): Any = values(i) + + override def getString(i: Int): String = values(i) + + private def reset(): Unit = { + for (i <- 0 until values.length) { + values(i) = null + } + } + + override def copy(): Row = { + val tmpValues = new Array[String](values.length) + System.arraycopy(values, 0, tmpValues, 0, values.length) + new StringArrayRow(tmpValues) + } + + def setValues(values: Array[String]): StringArrayRow = { + reset() + if (values != null) { + val minLength = Math.min(this.values.length, values.length) + System.arraycopy(values, 0, this.values, 0, minLength) + } + this + } +} + /** * A RDD to combine distinct values in block. * http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 22d8406..497df75 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -50,7 +50,7 @@ import org.apache.carbondata.processing.newflow.DataLoadExecutor import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException import org.apache.carbondata.spark.DataLoadResult import org.apache.carbondata.spark.splits.TableSplit -import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil} +import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil, CommonUtil} class SerializableConfiguration(@transient var value: Configuration) extends Serializable { @@ -180,7 +180,7 @@ class NewCarbonDataLoadRDD[K, V]( if (configuration == null) { configuration = new Configuration() } - configureCSVInputFormat(configuration) + CommonUtil.configureCSVInputFormat(configuration, carbonLoadModel) val hadoopAttemptContext = new TaskAttemptContextImpl(configuration, attemptId) val format = new CSVInputFormat if (isTableSplitPartition) { @@ -236,18 +236,6 @@ class NewCarbonDataLoadRDD[K, V]( } } - def configureCSVInputFormat(configuration: Configuration): Unit = { - CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar) - CSVInputFormat.setCSVDelimiter(configuration, carbonLoadModel.getCsvDelimiter) - CSVInputFormat.setEscapeCharacter(configuration, carbonLoadModel.getEscapeChar) - CSVInputFormat.setHeaderExtractionEnabled(configuration, - carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty) - CSVInputFormat.setQuoteCharacter(configuration, carbonLoadModel.getQuoteChar) - CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE, - CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT)) - } - /** * generate blocks id * http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 6766a39..8285bf8 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -22,14 +22,23 @@ import java.util import scala.collection.JavaConverters._ import scala.collection.mutable.Map +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.spark.SparkContext import org.apache.spark.sql.execution.command.{ColumnProperty, Field} +import org.apache.spark.util.FileUtils +import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.hadoop.csv.CSVInputFormat import org.apache.carbondata.lcm.status.SegmentStatusManager import org.apache.carbondata.processing.model.CarbonLoadModel import org.apache.carbondata.spark.exception.MalformedCarbonCommandException object CommonUtil { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + def validateColumnGroup(colGroup: String, noDictionaryDims: Seq[String], msrs: Seq[Field], retrievedColGrps: Seq[String], dims: Seq[Field]) { val colGrpCols = colGroup.split(',').map(_.trim) @@ -255,4 +264,41 @@ object CommonUtil { val details = SegmentStatusManager.readLoadMetadata(metadataPath) model.setLoadMetadataDetails(details.toList.asJava) } + + def configureCSVInputFormat(configuration: Configuration, + carbonLoadModel: CarbonLoadModel): Unit = { + CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar) + CSVInputFormat.setCSVDelimiter(configuration, carbonLoadModel.getCsvDelimiter) + CSVInputFormat.setEscapeCharacter(configuration, carbonLoadModel.getEscapeChar) + CSVInputFormat.setHeaderExtractionEnabled(configuration, + carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty) + CSVInputFormat.setQuoteCharacter(configuration, carbonLoadModel.getQuoteChar) + CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE, + CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT)) + } + + def configSplitMaxSize(context: SparkContext, filePaths: String, + hadoopConfiguration: Configuration): Unit = { + val defaultParallelism = if (context.defaultParallelism < 1) { + 1 + } else { + context.defaultParallelism + } + val spaceConsumed = FileUtils.getSpaceOccupied(filePaths) + val blockSize = + hadoopConfiguration.getLongBytes("dfs.blocksize", CarbonCommonConstants.CARBON_256MB) + LOGGER.info("[Block Distribution]") + // calculate new block size to allow use all the parallelism + if (spaceConsumed < defaultParallelism * blockSize) { + var newSplitSize: Long = spaceConsumed / defaultParallelism + if (newSplitSize < CarbonCommonConstants.CARBON_16MB) { + newSplitSize = CarbonCommonConstants.CARBON_16MB + } + hadoopConfiguration.set(FileInputFormat.SPLIT_MAXSIZE, newSplitSize.toString) + LOGGER.info(s"totalInputSpaceConsumed: $spaceConsumed , " + + s"defaultParallelism: $defaultParallelism") + LOGGER.info(s"mapreduce.input.fileinputformat.split.maxsize: ${ newSplitSize.toString }") + } + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala index 84c71ed..e87b27b 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala @@ -27,9 +27,13 @@ import scala.language.implicitConversions import scala.util.control.Breaks.{break, breakable} import org.apache.commons.lang3.{ArrayUtils, StringUtils} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.spark.Accumulator -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{NewHadoopRDD, RDD} import org.apache.spark.sql._ +import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.util.FileUtils import org.apache.carbondata.common.factory.CarbonCommonFactory @@ -45,8 +49,11 @@ import org.apache.carbondata.core.datastorage.store.impl.FileFactory import org.apache.carbondata.core.reader.CarbonDictionaryReader import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.writer.CarbonDictionaryWriter +import org.apache.carbondata.hadoop.csv.CSVInputFormat +import org.apache.carbondata.hadoop.io.StringArrayWritable import org.apache.carbondata.processing.etl.DataLoadingException import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException import org.apache.carbondata.spark.CarbonSparkFactory import org.apache.carbondata.spark.load.CarbonLoaderUtil import org.apache.carbondata.spark.rdd._ @@ -357,37 +364,49 @@ object GlobalDictionaryUtil { */ def loadDataFrame(sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel): DataFrame = { - val df = sqlContext.read - .format("com.databricks.spark.csv.newapi") - .option("header", { - if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) { - "true" - } else { - "false" - } - }) - .option("delimiter", { - if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) { - "" + DEFAULT_SEPARATOR - } else { - carbonLoadModel.getCsvDelimiter + val hadoopConfiguration = new Configuration() + CommonUtil.configureCSVInputFormat(hadoopConfiguration, carbonLoadModel) + hadoopConfiguration.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath) + val header = if (StringUtils.isBlank(carbonLoadModel.getCsvHeader)) { + val fileHeader = CarbonUtil.readHeader(carbonLoadModel.getFactFilePath.split(",")(0)) + if (StringUtils.isBlank(fileHeader)) { + throw new CarbonDataLoadingException("First line of the csv is not valid."); } - }) - .option("parserLib", "univocity") - .option("escape", carbonLoadModel.getEscapeChar) - .option("ignoreLeadingWhiteSpace", "false") - .option("ignoreTrailingWhiteSpace", "false") - .option("codec", "gzip") - .option("quote", { - if (StringUtils.isEmpty(carbonLoadModel.getQuoteChar)) { - "" + DEFAULT_QUOTE_CHARACTER + fileHeader + } else { + carbonLoadModel.getCsvHeader + } + val delimiter = if (carbonLoadModel.getCsvDelimiter == null) { + CarbonCommonConstants.COMMA + } else { + carbonLoadModel.getCsvDelimiter + } + val quote = if (carbonLoadModel.getQuoteChar == null) { + "\"" + } else { + carbonLoadModel.getQuoteChar + } + val columnNames = header.split(delimiter).map { column => + if ( column.startsWith(quote) && column.endsWith(quote)) { + column.substring(1, column.length - 1) } else { - carbonLoadModel.getQuoteChar + column } + } + val schema = StructType(columnNames.map[StructField, Array[StructField]]{ column => + StructField(column, StringType) }) - .option("comment", carbonLoadModel.getCommentChar) - .load(carbonLoadModel.getFactFilePath) - df + val values = new Array[String](columnNames.length) + val row = new StringArrayRow(values) + val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable]( + sqlContext.sparkContext, + classOf[CSVInputFormat], + classOf[NullWritable], + classOf[StringArrayWritable], + hadoopConfiguration).setName("global dictionary").map[Row] { currentRow => + row.setValues(currentRow._2.get()) + } + sqlContext.createDataFrame(rdd, schema) } // Hack for spark2 integration http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml index 9d584b9..ad921c0 100644 --- a/integration/spark/pom.xml +++ b/integration/spark/pom.xml @@ -35,11 +35,6 @@ - com.databricks - spark-csv_${scala.binary.version} - 1.2.0 - - org.apache.carbondata carbondata-common ${project.version} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala index 0954374..ef85635 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala @@ -18,6 +18,7 @@ package org.apache.carbondata.spark import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.sql._ import org.apache.spark.sql.execution.command.LoadTable import org.apache.spark.sql.types._ @@ -103,17 +104,38 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) { } private def writeToTempCSVFile(tempCSVFolder: String, options: CarbonOption): Unit = { - var writer: DataFrameWriter = - dataFrame.write - .format(csvPackage) - .option("header", "false") - .mode(SaveMode.Overwrite) - if (options.compress) { - writer = writer.option("codec", "gzip") + val strRDD = dataFrame.rdd.mapPartitions { case iter => + new Iterator[String] { + override def hasNext = iter.hasNext + + def convertToCSVString(seq: Seq[Any]): String = { + val build = new java.lang.StringBuilder() + if (seq.head != null) { + build.append(seq.head.toString) + } + val itemIter = seq.tail.iterator + while (itemIter.hasNext) { + build.append(CarbonCommonConstants.COMMA) + val value = itemIter.next() + if (value != null) { + build.append(value.toString) + } + } + build.toString + } + + override def next: String = { + convertToCSVString(iter.next.toSeq) + } + } } - writer.save(tempCSVFolder) + if (options.compress) { + strRDD.saveAsTextFile(tempCSVFolder, classOf[GzipCodec]) + } else { + strRDD.saveAsTextFile(tempCSVFolder) + } } /** @@ -134,8 +156,6 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) { Some(dataFrame)).run(cc) } - private def csvPackage: String = "com.databricks.spark.csv.newapi" - private def convertToCarbonType(sparkType: DataType): String = { sparkType match { case StringType => CarbonType.STRING.getName http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 1f8af66..8504fc8 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -25,7 +25,6 @@ import scala.collection.mutable.ListBuffer import scala.util.Random import scala.util.control.Breaks._ -import com.databricks.spark.csv.newapi.CarbonTextFile import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.Job @@ -551,7 +550,7 @@ object CarbonDataRDDFactory { org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.BZip2Codec""".stripMargin) - CarbonTextFile.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration) + CommonUtil.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration) val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat inputFormat match { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala index 20569df..2091598 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala @@ -21,11 +21,9 @@ import java.util import java.util.LinkedHashSet import scala.collection.JavaConverters._ -import scala.collection.mutable.ListBuffer import scala.language.implicitConversions import org.apache.hadoop.fs.Path -import org.apache.spark.sql.SchemaRDD import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.AttributeReference http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/DefaultSourceTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/DefaultSourceTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/DefaultSourceTestCase.scala deleted file mode 100644 index 20229bb..0000000 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/DefaultSourceTestCase.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.carbondata.spark.testsuite.dataload - -import java.io.BufferedWriter -import java.io.File -import java.io.FileWriter -import java.util.Random - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat -import org.apache.spark.sql.common.util.CarbonHiveContext -import org.apache.spark.sql.common.util.CarbonHiveContext.sql -import org.apache.spark.sql.common.util.QueryTest -import org.scalatest.BeforeAndAfterAll - -/** - * Test Case for new defaultsource: com.databricks.spark.csv.newapi - * - * @date: Apr 10, 2016 10:34:58 PM - * @See org.apache.carbondata.spark.util.GlobalDictionaryUtil - */ -class DefaultSourceTestCase extends QueryTest with BeforeAndAfterAll { - - var filePath: String = _ - - override def beforeAll { - buildTestData - buildTable - } - - def buildTestData() = { - val workDirectory = new File(this.getClass.getResource("/").getPath + "/../../").getCanonicalPath - filePath = workDirectory + "/target/defaultsource.csv" - val file = new File(filePath) - val writer = new BufferedWriter(new FileWriter(file)) - writer.write("c1,c2,c3,c4") - writer.newLine() - var i = 0 - val random = new Random - for (i <- 0 until 2000000) { - writer.write(" aaaaaaa" + i + " , " + - "bbbbbbb" + i % 1000 + "," + - i % 1000000 + "," + i % 10000 + "\n") - } - writer.close - } - - def buildTable() = { - try { - sql("drop table if exists defaultsource") - sql("""create table if not exists defaultsource - (c1 string, c2 string, c3 int, c4 int) - STORED BY 'org.apache.carbondata.format'""") - } catch { - case ex: Throwable => LOGGER.error(ex.getMessage + "\r\n" + ex.getStackTraceString) - } - } - - test("test new defaultsource: com.databricks.spark.csv.newapi") { - val df1 = CarbonHiveContext.read - .format("com.databricks.spark.csv.newapi") - .option("header", "true") - .option("delimiter", ",") - .option("parserLib", "univocity") - .option("ignoreLeadingWhiteSpace", "true") - .option("ignoreTrailingWhiteSpace", "true") - .load(filePath) - - assert(!df1.first().getString(0).startsWith(" ")) - assert(df1.count() == 2000000) - assert(df1.rdd.partitions.length == 3) - } - - test("test defaultsource: com.databricks.spark.csv") { - val df2 = CarbonHiveContext.read - .format("com.databricks.spark.csv") - .option("header", "true") - .option("delimiter", ",") - .option("parserLib", "univocity") - .option("ignoreLeadingWhiteSpace", "true") - .option("ignoreTrailingWhiteSpace", "true") - .load(filePath) - - assert(!df2.first().getString(0).startsWith(" ")) - assert(df2.count() == 2000000) - assert(df2.rdd.partitions.length == 3) - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala index fa8731a..c5bc058 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala @@ -56,6 +56,7 @@ class TestLoadDataWithNotProperInputFile extends QueryTest { GlobalDictionaryUtil.loadDataFrame(CarbonHiveContext, carbonLoadModel) } catch { case e: Throwable => + e.printStackTrace() assert(e.getMessage.contains("Please check your input path and make sure " + "that files end with '.csv' and content is not empty")) } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 96c4a08..e147003 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -25,7 +25,6 @@ import scala.collection.mutable.ListBuffer import scala.util.Random import scala.util.control.Breaks._ -import com.databricks.spark.csv.newapi.CarbonTextFile import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.Job @@ -547,7 +546,7 @@ object CarbonDataRDDFactory { org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.BZip2Codec""".stripMargin) - CarbonTextFile.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration) + CommonUtil.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration) val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat inputFormat match { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c91f186..c1bfcbe 100644 --- a/pom.xml +++ b/pom.xml @@ -102,7 +102,6 @@ UTF-8 - 1.2.0 1.1.2.6 2.2.0 4.4.0-stable