carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/2] incubator-carbondata git commit: unify csv reader
Date Wed, 04 Jan 2017 09:44:21 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master af956f533 -> 82072ee19


unify csv reader


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/e9093755
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/e9093755
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/e9093755

Branch: refs/heads/master
Commit: e90937555ea443bea77c75079e9babb3b703fe0f
Parents: af956f5
Author: QiangCai <qiangcai@qq.com>
Authored: Tue Jan 3 16:28:06 2017 +0800
Committer: jackylk <jacky.likun@huawei.com>
Committed: Wed Jan 4 17:42:46 2017 +0800

----------------------------------------------------------------------
 integration/spark-common/pom.xml                |   5 -
 .../carbondata/spark/csv/CarbonCsvReader.scala  | 182 --------------
 .../spark/csv/CarbonCsvRelation.scala           | 249 -------------------
 .../carbondata/spark/csv/CarbonTextFile.scala   |  91 -------
 .../carbondata/spark/csv/DefaultSource.scala    | 183 --------------
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |  30 +++
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  16 +-
 .../carbondata/spark/util/CommonUtil.scala      |  46 ++++
 .../spark/util/GlobalDictionaryUtil.scala       |  75 +++---
 integration/spark/pom.xml                       |   5 -
 .../spark/CarbonDataFrameWriter.scala           |  40 ++-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   3 +-
 .../spark/sql/CarbonDatasourceRelation.scala    |   2 -
 .../dataload/DefaultSourceTestCase.scala        | 105 --------
 .../TestLoadDataWithNotProperInputFile.scala    |   1 +
 .../spark/rdd/CarbonDataRDDFactory.scala        |   3 +-
 pom.xml                                         |   1 -
 17 files changed, 158 insertions(+), 879 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark-common/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml
index 8934c89..4bb4e0a 100644
--- a/integration/spark-common/pom.xml
+++ b/integration/spark-common/pom.xml
@@ -75,11 +75,6 @@
       <artifactId>junit</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.databricks</groupId>
-      <artifactId>spark-csv_${scala.binary.version}</artifactId>
-      <version>1.2.0</version>
-    </dependency>
-    <dependency>
       <groupId>org.scalatest</groupId>
       <artifactId>scalatest_${scala.binary.version}</artifactId>
       <version>2.2.1</version>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala
deleted file mode 100644
index 551fc9c..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.databricks.spark.sql.readers
-
-/**
- * Parser for parsing lines in bulk. Use this when efficiency is desired.
- *
- * @param iter iterator over lines in the file
- * @param fieldSep the delimiter used to separate fields in a line
- * @param lineSep the delimiter used to separate lines
- * @param quote character used to quote fields
- * @param escape character used to escape the quote character
- * @param ignoreLeadingSpace ignore white space before a field
- * @param ignoreTrailingSpace ignore white space after a field
- * @param headers headers for the columns
- * @param inputBufSize size of buffer to use for parsing input, tune for performance
- * @param maxCols maximum number of columns allowed, for safety against bad inputs
- */
-class CarbonBulkCsvReader (iter: Iterator[String],
-    split: Int,
-    fieldSep: Char = ',',
-    lineSep: String = "\n",
-    quote: Char = '"',
-    escape: Char = '\\',
-    commentMarker: Char = '#',
-    ignoreLeadingSpace: Boolean = true,
-    ignoreTrailingSpace: Boolean = true,
-    headers: Seq[String],
-    inputBufSize: Int = 128,
-    maxCols: Int = 20480)
-  extends CsvReader(fieldSep,
-      lineSep,
-      quote,
-      escape,
-      commentMarker,
-      ignoreLeadingSpace,
-      ignoreTrailingSpace,
-      headers,
-      inputBufSize,
-      maxCols)
-    with Iterator[Array[String]] {
-
-  private val reader = new CarbonStringIteratorReader(iter)
-  parser.beginParsing(reader)
-  private var nextRecord = parser.parseNext()
-
-  /**
-   * get the next parsed line.
-   *
-   * @return array of strings where each string is a field in the CSV record
-   */
-  def next: Array[String] = {
-    val curRecord = nextRecord
-    if(curRecord != null) {
-      nextRecord = parser.parseNext()
-    } else {
-      throw new NoSuchElementException("next record is null")
-    }
-    curRecord
-  }
-
-  def hasNext: Boolean = nextRecord != null
-
-}
-
-/**
- * A Reader that "reads" from a sequence of lines. Spark's textFile method removes newlines at
- * end of each line Univocity parser requires a Reader that provides access to the data to be
- * parsed and needs the newlines to be present
- * @param iter iterator over RDD[String]
- */
-private class CarbonStringIteratorReader(val iter: Iterator[String]) extends java.io.Reader {
-
-  private var next: Long = 0
-  private var length: Long = 0  // length of input so far
-  private var start: Long = 0
-  private var str: String = null   // current string from iter
-
-  /**
-   * fetch next string from iter, if done with current one
-   * pretend there is a new line at the end of every string we get from from iter
-   */
-  private def refill(): Unit = {
-    if (length == next) {
-      if (iter.hasNext) {
-        str = iter.next
-        start = length
-        // add a space to every line except the last one to store '\n'
-        if (iter.hasNext) {
-          length += (str.length + 1) // allowance for newline removed by SparkContext.textFile()
-        } else {
-          length += str.length
-        }
-      } else {
-        str = null
-      }
-    }
-  }
-
-  /**
-   * read the next character, if at end of string pretend there is a new line
-   */
-  override def read(): Int = {
-    refill()
-    if(next >= length) {
-      -1
-    } else {
-      val cur = next - start
-      next += 1
-      if (cur == str.length) '\n' else str.charAt(cur.toInt)
-    }
-  }
-
-  /**
-   * read from str into cbuf
-   */
-  def read(cbuf: Array[Char], off: Int, len: Int): Int = {
-    refill()
-    var n = 0
-    if ((off < 0) || (off > cbuf.length) || (len < 0) ||
-      ((off + len) > cbuf.length) || ((off + len) < 0)) {
-      throw new IndexOutOfBoundsException()
-    } else if (len == 0) {
-      n = 0
-    } else {
-      if (next >= length) {   // end of input
-        n = -1
-      } else {
-        n = Math.min(length - next, len).toInt // lesser of amount of input available or buf size
-        // add a '\n' to every line except the last one
-        if (n == length - next && iter.hasNext) {
-          str.getChars((next - start).toInt, (next - start + n - 1).toInt, cbuf, off)
-          cbuf(off + n - 1) = '\n'
-        } else {
-          str.getChars((next - start).toInt, (next - start + n).toInt, cbuf, off)
-        }
-        next += n
-        if (n < len) {
-          val m = read(cbuf, off + n, len - n)  // have more space, fetch more input from iter
-          if(m != -1) n += m
-        }
-      }
-    }
-    n
-  }
-
-  override def skip(ns: Long): Long = {
-    throw new IllegalArgumentException("Skip not implemented")
-  }
-
-  override def ready: Boolean = {
-    refill()
-    true
-  }
-
-  override def markSupported: Boolean = false
-
-  override def mark(readAheadLimit: Int): Unit = {
-    throw new IllegalArgumentException("Mark not implemented")
-  }
-
-  override def reset(): Unit = {
-    throw new IllegalArgumentException("Mark and hence reset not implemented")
-  }
-
-  def close(): Unit = { }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
deleted file mode 100644
index e751fe8..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.databricks.spark.csv
-
-import java.io.IOException
-
-import scala.collection.JavaConverters._
-import scala.util.control.NonFatal
-
-import com.databricks.spark.csv.newapi.CarbonTextFile
-import com.databricks.spark.csv.util._
-import com.databricks.spark.sql.readers._
-import org.apache.commons.csv._
-import org.apache.commons.lang3.StringUtils
-import org.apache.hadoop.fs.Path
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql._
-import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation, TableScan}
-import org.apache.spark.sql.types._
-import org.slf4j.LoggerFactory
-
-import org.apache.carbondata.processing.etl.DataLoadingException
-
-case class CarbonCsvRelation protected[spark] (
-    location: String,
-    useHeader: Boolean,
-    delimiter: Char,
-    quote: Char,
-    escape: Character,
-    comment: Character,
-    parseMode: String,
-    parserLib: String,
-    ignoreLeadingWhiteSpace: Boolean,
-    ignoreTrailingWhiteSpace: Boolean,
-    userSchema: StructType = null,
-    charset: String = TextFile.DEFAULT_CHARSET.name(),
-    inferCsvSchema: Boolean)(@transient val sqlContext: SQLContext)
-  extends BaseRelation with TableScan with InsertableRelation {
-
-  /**
-   * Limit the number of lines we'll search for a header row that isn't comment-prefixed.
-   */
-  private val MAX_COMMENT_LINES_IN_HEADER = 10
-
-  private val logger = LoggerFactory.getLogger(CarbonCsvRelation.getClass)
-
-  // Parse mode flags
-  if (!ParseModes.isValidMode(parseMode)) {
-    logger.warn(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
-  }
-
-  if((ignoreLeadingWhiteSpace || ignoreLeadingWhiteSpace) && ParserLibs.isCommonsLib(parserLib)) {
-    logger.warn(s"Ignore white space options may not work with Commons parserLib option")
-  }
-
-  private val failFast = ParseModes.isFailFastMode(parseMode)
-  private val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
-  private val permissive = ParseModes.isPermissiveMode(parseMode)
-
-  val schema = inferSchema()
-
-  def tokenRdd(header: Array[String]): RDD[Array[String]] = {
-
-    val baseRDD = CarbonTextFile.withCharset(sqlContext.sparkContext, location, charset)
-
-    if(ParserLibs.isUnivocityLib(parserLib)) {
-      univocityParseCSV(baseRDD, header)
-    } else {
-      val csvFormat = CSVFormat.DEFAULT
-        .withDelimiter(delimiter)
-        .withQuote(quote)
-        .withEscape(escape)
-        .withSkipHeaderRecord(false)
-        .withHeader(header: _*)
-        .withCommentMarker(comment)
-
-      // If header is set, make sure firstLine is materialized before sending to executors.
-      val filterLine = if (useHeader) firstLine else null
-
-      baseRDD.mapPartitions { iter =>
-        // When using header, any input line that equals firstLine is assumed to be header
-        val csvIter = if (useHeader) {
-          iter.filter(_ != filterLine)
-        } else {
-          iter
-        }
-        parseCSV(csvIter, csvFormat)
-      }
-    }
-  }
-
-  // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits.
-  def buildScan: RDD[Row] = {
-    val schemaFields = schema.fields
-    tokenRdd(schemaFields.map(_.name)).flatMap{ tokens =>
-
-      if (dropMalformed && schemaFields.length != tokens.size) {
-        logger.warn(s"Dropping malformed line: $tokens")
-        None
-      } else if (failFast && schemaFields.length != tokens.size) {
-        throw new RuntimeException(s"Malformed line in FAILFAST mode: $tokens")
-      } else {
-        var index: Int = 0
-        val rowArray = new Array[Any](schemaFields.length)
-        try {
-          index = 0
-          while (index < schemaFields.length) {
-            val field = schemaFields(index)
-            rowArray(index) = TypeCast.castTo(tokens(index), field.dataType, field.nullable)
-            index = index + 1
-          }
-          Some(Row.fromSeq(rowArray))
-        } catch {
-          case aiob: ArrayIndexOutOfBoundsException if permissive =>
-            (index until schemaFields.length).foreach(ind => rowArray(ind) = null)
-            Some(Row.fromSeq(rowArray))
-        }
-      }
-    }
-  }
-
-  private def inferSchema(): StructType = {
-    if (this.userSchema != null) {
-      userSchema
-    } else {
-      val firstRow = if (ParserLibs.isUnivocityLib(parserLib)) {
-        val escapeVal = if (escape == null) '\\' else escape.charValue()
-        val commentChar: Char = if (comment == null) '\0' else comment
-        new LineCsvReader(fieldSep = delimiter, quote = quote, escape = escapeVal,
-          commentMarker = commentChar).parseLine(firstLine)
-      } else {
-        val csvFormat = CSVFormat.DEFAULT
-          .withDelimiter(delimiter)
-          .withQuote(quote)
-          .withEscape(escape)
-          .withSkipHeaderRecord(false)
-        CSVParser.parse(firstLine, csvFormat).getRecords.get(0).asScala.toArray
-      }
-      if(null == firstRow) {
-        throw new DataLoadingException("First line of the csv is not valid.")
-      }
-      val header = if (useHeader) {
-        firstRow
-      } else {
-        firstRow.zipWithIndex.map { case (value, index) => s"C$index"}
-      }
-      if (this.inferCsvSchema) {
-        InferSchema(tokenRdd(header), header)
-      } else {
-        // By default fields are assumed to be StringType
-        val schemaFields = header.map { fieldName =>
-          StructField(fieldName.toString, StringType, nullable = true)
-        }
-        StructType(schemaFields)
-      }
-    }
-  }
-
-  /**
-   * Returns the first line of the first non-empty file in path
-   */
-  private lazy val firstLine = {
-    val csv = CarbonTextFile.withCharset(sqlContext.sparkContext, location, charset)
-    if (comment == null) {
-      csv.first()
-    } else {
-      csv.take(MAX_COMMENT_LINES_IN_HEADER)
-        .find(x => !StringUtils.isEmpty(x) && !x.startsWith(comment.toString))
-        .getOrElse(sys.error(s"No uncommented header line in " +
-          s"first $MAX_COMMENT_LINES_IN_HEADER lines"))
-    }
-   }
-
-  private def univocityParseCSV(
-     file: RDD[String],
-     header: Seq[String]): RDD[Array[String]] = {
-    // If header is set, make sure firstLine is materialized before sending to executors.
-    val filterLine = if (useHeader) firstLine else null
-    val dataLines = if (useHeader) file.filter(_ != filterLine) else file
-    val rows = dataLines.mapPartitionsWithIndex({
-      case (split, iter) =>
-        val escapeVal = if (escape == null) '\\' else escape.charValue()
-        val commentChar: Char = if (comment == null) '\0' else comment
-
-        new CarbonBulkCsvReader(iter, split,
-          headers = header, fieldSep = delimiter,
-          quote = quote, escape = escapeVal, commentMarker = commentChar,
-          ignoreLeadingSpace = ignoreLeadingWhiteSpace,
-          ignoreTrailingSpace = ignoreTrailingWhiteSpace)
-    }, true)
-    rows
-  }
-
-  private def parseCSV(
-      iter: Iterator[String],
-      csvFormat: CSVFormat): Iterator[Array[String]] = {
-    iter.flatMap { line =>
-      try {
-        val records = CSVParser.parse(line, csvFormat).getRecords
-        if (records.isEmpty) {
-          logger.warn(s"Ignoring empty line: $line")
-          None
-        } else {
-          Some(records.get(0).asScala.toArray)
-        }
-      } catch {
-        case NonFatal(e) if !failFast =>
-          logger.error(s"Exception while parsing line: $line. ", e)
-          None
-      }
-    }
-  }
-
-  // The function below was borrowed from JSONRelation
-  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
-    val filesystemPath = new Path(location)
-    val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-
-    if (overwrite) {
-      try {
-        fs.delete(filesystemPath, true)
-      } catch {
-        case e: IOException =>
-          throw new IOException(
-            s"Unable to clear output directory ${filesystemPath.toString} prior"
-              + s" to INSERT OVERWRITE a CSV table:\n${e.toString}")
-      }
-      // Write the data. We assume that schema isn't changed, and we won't update it.
-      data.saveAsCsvFile(location, Map("delimiter" -> delimiter.toString))
-    } else {
-      sys.error("CSV tables only support INSERT OVERWRITE for now.")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
deleted file mode 100644
index b5d7542..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.databricks.spark.csv.newapi
-
-import java.nio.charset.Charset
-
-import com.databricks.spark.csv.util.TextFile
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.{LongWritable, Text}
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat}
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.{NewHadoopRDD, RDD}
-import org.apache.spark.util.FileUtils
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-
-/**
- * create RDD use CarbonDataLoadInputFormat
- */
-object CarbonTextFile {
-
-  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  def configSplitMaxSize(context: SparkContext, filePaths: String,
-      hadoopConfiguration: Configuration): Unit = {
-    val defaultParallelism = if (context.defaultParallelism < 1) {
-      1
-    } else {
-      context.defaultParallelism
-    }
-    val spaceConsumed = FileUtils.getSpaceOccupied(filePaths)
-    val blockSize =
-      hadoopConfiguration.getLongBytes("dfs.blocksize", CarbonCommonConstants.CARBON_256MB)
-    LOGGER.info("[Block Distribution]")
-    // calculate new block size to allow use all the parallelism
-    if (spaceConsumed < defaultParallelism * blockSize) {
-      var newSplitSize: Long = spaceConsumed / defaultParallelism
-      if (newSplitSize < CarbonCommonConstants.CARBON_16MB) {
-        newSplitSize = CarbonCommonConstants.CARBON_16MB
-      }
-      hadoopConfiguration.set(FileInputFormat.SPLIT_MAXSIZE, newSplitSize.toString)
-      LOGGER.info(s"totalInputSpaceConsumed: $spaceConsumed , " +
-          s"defaultParallelism: $defaultParallelism")
-      LOGGER.info(s"mapreduce.input.fileinputformat.split.maxsize: ${ newSplitSize.toString }")
-    }
-  }
-  private def newHadoopRDD(sc: SparkContext, location: String) = {
-    val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)
-    hadoopConfiguration.setStrings(FileInputFormat.INPUT_DIR, location)
-    hadoopConfiguration.setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, true)
-    hadoopConfiguration.set("io.compression.codecs",
-      """org.apache.hadoop.io.compress.GzipCodec,
-         org.apache.hadoop.io.compress.DefaultCodec,
-         org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
-
-    configSplitMaxSize(sc, location, hadoopConfiguration)
-    new NewHadoopRDD[LongWritable, Text](
-      sc,
-      classOf[TextInputFormat],
-      classOf[LongWritable],
-      classOf[Text],
-      hadoopConfiguration).setName("newHadoopRDD-spark-csv")
-  }
-
-  def withCharset(sc: SparkContext, location: String, charset: String): RDD[String] = {
-    if (Charset.forName(charset) == TextFile.DEFAULT_CHARSET) {
-      newHadoopRDD(sc, location).map(pair => pair._2.toString)
-    } else {
-      // can't pass a Charset object here cause its not serializable
-      // TODO: maybe use mapPartitions instead?
-      newHadoopRDD(sc, location).map(
-        pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
deleted file mode 100644
index 89595e5..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.databricks.spark.csv.newapi
-
-import com.databricks.spark.csv.{CarbonCsvRelation, CsvSchemaRDD}
-import com.databricks.spark.csv.util.{ParserLibs, TextFile, TypeCast}
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
-import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.StructType
-
-/**
- * Provides access to CSV data from pure SQL statements (i.e. for users of the
- * JDBC server).
- */
-class DefaultSource
-    extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {
-
-  private def checkPath(parameters: Map[String, String]): String = {
-    parameters.getOrElse("path", sys.error("'path' must be specified for CSV data."))
-  }
-
-  /**
-   * Creates a new relation for data store in CSV given parameters.
-   * Parameters have to include 'path' and optionally 'delimiter', 'quote', and 'header'
-   */
-  override def createRelation(sqlContext: SQLContext,
-      parameters: Map[String, String]): BaseRelation = {
-    createRelation(sqlContext, parameters, null)
-  }
-
-  /**
-   * Creates a new relation for data store in CSV given parameters and user supported schema.
-   * Parameters have to include 'path' and optionally 'delimiter', 'quote', and 'header'
-   */
-  override def createRelation(
-    sqlContext: SQLContext,
-    parameters: Map[String, String],
-    schema: StructType): BaseRelation = {
-    val path = checkPath(parameters)
-    val delimiter = TypeCast.toChar(parameters.getOrElse("delimiter", ","))
-
-    val quote = parameters.getOrElse("quote", "\"")
-    val quoteChar = if (quote.length == 1) {
-      quote.charAt(0)
-    } else {
-      throw new Exception("Quotation cannot be more than one character.")
-    }
-
-    val escape = parameters.getOrElse("escape", null)
-    val escapeChar: Character = if (escape == null || (escape.length == 0)) {
-      null
-    } else if (escape.length == 1) {
-      escape.charAt(0)
-    } else {
-      throw new Exception("Escape character cannot be more than one character.")
-    }
-
-    val comment = parameters.getOrElse("comment", "#")
-    val commentChar: Character = if (comment == null) {
-      null
-    } else if (comment.length == 1) {
-      comment.charAt(0)
-    } else {
-      throw new Exception("Comment marker cannot be more than one character.")
-    }
-
-    val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
-
-    val useHeader = parameters.getOrElse("header", "false")
-    val headerFlag = if (useHeader == "true") {
-      true
-    } else if (useHeader == "false") {
-      false
-    } else {
-      throw new Exception("Header flag can be true or false")
-    }
-
-    val parserLib = parameters.getOrElse("parserLib", ParserLibs.DEFAULT)
-    val ignoreLeadingWhiteSpace = parameters.getOrElse("ignoreLeadingWhiteSpace", "false")
-    val ignoreLeadingWhiteSpaceFlag = if (ignoreLeadingWhiteSpace == "false") {
-      false
-    } else if (ignoreLeadingWhiteSpace == "true") {
-      if (!ParserLibs.isUnivocityLib(parserLib)) {
-        throw new Exception("Ignore whitesspace supported for Univocity parser only")
-      }
-      true
-    } else {
-      throw new Exception("Ignore white space flag can be true or false")
-    }
-    val ignoreTrailingWhiteSpace = parameters.getOrElse("ignoreTrailingWhiteSpace", "false")
-    val ignoreTrailingWhiteSpaceFlag = if (ignoreTrailingWhiteSpace == "false") {
-      false
-    } else if (ignoreTrailingWhiteSpace == "true") {
-      if (!ParserLibs.isUnivocityLib(parserLib)) {
-        throw new Exception("Ignore whitespace supported for the Univocity parser only")
-      }
-      true
-    } else {
-      throw new Exception("Ignore white space flag can be true or false")
-    }
-
-    val charset = parameters.getOrElse("charset", TextFile.DEFAULT_CHARSET.name())
-    // TODO validate charset?
-
-    val inferSchema = parameters.getOrElse("inferSchema", "false")
-    val inferSchemaFlag = if (inferSchema == "false") {
-      false
-    } else if (inferSchema == "true") {
-      true
-    } else {
-      throw new Exception("Infer schema flag can be true or false")
-    }
-
-    CarbonCsvRelation(path,
-      headerFlag,
-      delimiter,
-      quoteChar,
-      escapeChar,
-      commentChar,
-      parseMode,
-      parserLib,
-      ignoreLeadingWhiteSpaceFlag,
-      ignoreTrailingWhiteSpaceFlag,
-      schema,
-      charset,
-      inferSchemaFlag)(sqlContext)
-  }
-
-  override def createRelation(
-    sqlContext: SQLContext,
-    mode: SaveMode,
-    parameters: Map[String, String],
-    data: DataFrame): BaseRelation = {
-    val path = checkPath(parameters)
-    val filesystemPath = new Path(path)
-    val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-    val doSave = if (fs.exists(filesystemPath)) {
-      mode match {
-        case SaveMode.Append =>
-          sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}")
-        case SaveMode.Overwrite =>
-          fs.delete(filesystemPath, true)
-          true
-        case SaveMode.ErrorIfExists =>
-          sys.error(s"path $path already exists.")
-        case SaveMode.Ignore => false
-      }
-    } else {
-      true
-    }
-
-    val codec: Class[_ <: CompressionCodec] =
-      parameters.getOrElse("codec", "none") match {
-        case "gzip" => classOf[GzipCodec]
-        case _ => null
-      }
-
-    if (doSave) {
-      // Only save data when the save mode is not ignore.
-      data.saveAsCsvFile(path, parameters, codec)
-    }
-
-    createRelation(sqlContext, parameters, data.schema)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 9d9fa99..53ebd41 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -229,6 +229,36 @@ class CarbonAllDictionaryCombineRDD(
   }
 }
 
+class StringArrayRow(var values: Array[String]) extends Row {
+
+  override def length: Int = values.length
+
+  override def get(i: Int): Any = values(i)
+
+  override def getString(i: Int): String = values(i)
+
+  private def reset(): Unit = {
+    for (i <- 0 until values.length) {
+      values(i) = null
+    }
+  }
+
+  override def copy(): Row = {
+    val tmpValues = new Array[String](values.length)
+    System.arraycopy(values, 0, tmpValues, 0, values.length)
+    new StringArrayRow(tmpValues)
+  }
+
+  def setValues(values: Array[String]): StringArrayRow = {
+    reset()
+    if (values != null) {
+      val minLength = Math.min(this.values.length, values.length)
+      System.arraycopy(values, 0, this.values, 0, minLength)
+    }
+    this
+  }
+}
+
 /**
  * A RDD to combine distinct values in block.
  *

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 22d8406..497df75 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -50,7 +50,7 @@ import org.apache.carbondata.processing.newflow.DataLoadExecutor
 import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException
 import org.apache.carbondata.spark.DataLoadResult
 import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil}
+import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil, CommonUtil}
 
 class SerializableConfiguration(@transient var value: Configuration) extends Serializable {
 
@@ -180,7 +180,7 @@ class NewCarbonDataLoadRDD[K, V](
         if (configuration == null) {
           configuration = new Configuration()
         }
-        configureCSVInputFormat(configuration)
+        CommonUtil.configureCSVInputFormat(configuration, carbonLoadModel)
         val hadoopAttemptContext = new TaskAttemptContextImpl(configuration, attemptId)
         val format = new CSVInputFormat
         if (isTableSplitPartition) {
@@ -236,18 +236,6 @@ class NewCarbonDataLoadRDD[K, V](
         }
       }
 
-      def configureCSVInputFormat(configuration: Configuration): Unit = {
-        CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar)
-        CSVInputFormat.setCSVDelimiter(configuration, carbonLoadModel.getCsvDelimiter)
-        CSVInputFormat.setEscapeCharacter(configuration, carbonLoadModel.getEscapeChar)
-        CSVInputFormat.setHeaderExtractionEnabled(configuration,
-          carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty)
-        CSVInputFormat.setQuoteCharacter(configuration, carbonLoadModel.getQuoteChar)
-        CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance
-          .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
-            CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT))
-      }
-
       /**
        * generate blocks id
        *

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 6766a39..8285bf8 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -22,14 +22,23 @@ import java.util
 import scala.collection.JavaConverters._
 import scala.collection.mutable.Map
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.spark.SparkContext
 import org.apache.spark.sql.execution.command.{ColumnProperty, Field}
+import org.apache.spark.util.FileUtils
 
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.hadoop.csv.CSVInputFormat
 import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 object CommonUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
   def validateColumnGroup(colGroup: String, noDictionaryDims: Seq[String],
       msrs: Seq[Field], retrievedColGrps: Seq[String], dims: Seq[Field]) {
     val colGrpCols = colGroup.split(',').map(_.trim)
@@ -255,4 +264,41 @@ object CommonUtil {
     val details = SegmentStatusManager.readLoadMetadata(metadataPath)
     model.setLoadMetadataDetails(details.toList.asJava)
   }
+
+  def configureCSVInputFormat(configuration: Configuration,
+      carbonLoadModel: CarbonLoadModel): Unit = {
+    CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar)
+    CSVInputFormat.setCSVDelimiter(configuration, carbonLoadModel.getCsvDelimiter)
+    CSVInputFormat.setEscapeCharacter(configuration, carbonLoadModel.getEscapeChar)
+    CSVInputFormat.setHeaderExtractionEnabled(configuration,
+      carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty)
+    CSVInputFormat.setQuoteCharacter(configuration, carbonLoadModel.getQuoteChar)
+    CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
+        CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT))
+  }
+
+  def configSplitMaxSize(context: SparkContext, filePaths: String,
+      hadoopConfiguration: Configuration): Unit = {
+    val defaultParallelism = if (context.defaultParallelism < 1) {
+      1
+    } else {
+      context.defaultParallelism
+    }
+    val spaceConsumed = FileUtils.getSpaceOccupied(filePaths)
+    val blockSize =
+      hadoopConfiguration.getLongBytes("dfs.blocksize", CarbonCommonConstants.CARBON_256MB)
+    LOGGER.info("[Block Distribution]")
+    // calculate new block size to allow use all the parallelism
+    if (spaceConsumed < defaultParallelism * blockSize) {
+      var newSplitSize: Long = spaceConsumed / defaultParallelism
+      if (newSplitSize < CarbonCommonConstants.CARBON_16MB) {
+        newSplitSize = CarbonCommonConstants.CARBON_16MB
+      }
+      hadoopConfiguration.set(FileInputFormat.SPLIT_MAXSIZE, newSplitSize.toString)
+      LOGGER.info(s"totalInputSpaceConsumed: $spaceConsumed , " +
+                  s"defaultParallelism: $defaultParallelism")
+      LOGGER.info(s"mapreduce.input.fileinputformat.split.maxsize: ${ newSplitSize.toString }")
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 84c71ed..e87b27b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -27,9 +27,13 @@ import scala.language.implicitConversions
 import scala.util.control.Breaks.{break, breakable}
 
 import org.apache.commons.lang3.{ArrayUtils, StringUtils}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.Accumulator
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{NewHadoopRDD, RDD}
 import org.apache.spark.sql._
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.apache.spark.util.FileUtils
 
 import org.apache.carbondata.common.factory.CarbonCommonFactory
@@ -45,8 +49,11 @@ import org.apache.carbondata.core.datastorage.store.impl.FileFactory
 import org.apache.carbondata.core.reader.CarbonDictionaryReader
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.writer.CarbonDictionaryWriter
+import org.apache.carbondata.hadoop.csv.CSVInputFormat
+import org.apache.carbondata.hadoop.io.StringArrayWritable
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
 import org.apache.carbondata.spark.CarbonSparkFactory
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
 import org.apache.carbondata.spark.rdd._
@@ -357,37 +364,49 @@ object GlobalDictionaryUtil {
    */
   def loadDataFrame(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel): DataFrame = {
-    val df = sqlContext.read
-      .format("com.databricks.spark.csv.newapi")
-      .option("header", {
-        if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
-          "true"
-        } else {
-          "false"
-        }
-      })
-      .option("delimiter", {
-        if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
-          "" + DEFAULT_SEPARATOR
-        } else {
-          carbonLoadModel.getCsvDelimiter
+      val hadoopConfiguration = new Configuration()
+      CommonUtil.configureCSVInputFormat(hadoopConfiguration, carbonLoadModel)
+      hadoopConfiguration.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
+      val header = if (StringUtils.isBlank(carbonLoadModel.getCsvHeader)) {
+        val fileHeader = CarbonUtil.readHeader(carbonLoadModel.getFactFilePath.split(",")(0))
+        if (StringUtils.isBlank(fileHeader)) {
+          throw new CarbonDataLoadingException("First line of the csv is not valid.");
         }
-      })
-      .option("parserLib", "univocity")
-      .option("escape", carbonLoadModel.getEscapeChar)
-      .option("ignoreLeadingWhiteSpace", "false")
-      .option("ignoreTrailingWhiteSpace", "false")
-      .option("codec", "gzip")
-      .option("quote", {
-        if (StringUtils.isEmpty(carbonLoadModel.getQuoteChar)) {
-          "" + DEFAULT_QUOTE_CHARACTER
+        fileHeader
+      } else {
+        carbonLoadModel.getCsvHeader
+      }
+      val delimiter = if (carbonLoadModel.getCsvDelimiter == null) {
+        CarbonCommonConstants.COMMA
+      } else {
+        carbonLoadModel.getCsvDelimiter
+      }
+      val quote = if (carbonLoadModel.getQuoteChar == null) {
+        "\""
+      } else {
+        carbonLoadModel.getQuoteChar
+      }
+      val columnNames = header.split(delimiter).map { column =>
+        if ( column.startsWith(quote) && column.endsWith(quote)) {
+          column.substring(1, column.length - 1)
         } else {
-          carbonLoadModel.getQuoteChar
+          column
         }
+      }
+      val schema = StructType(columnNames.map[StructField, Array[StructField]]{ column =>
+        StructField(column, StringType)
       })
-      .option("comment", carbonLoadModel.getCommentChar)
-      .load(carbonLoadModel.getFactFilePath)
-    df
+      val values = new Array[String](columnNames.length)
+      val row = new StringArrayRow(values)
+      val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
+        sqlContext.sparkContext,
+        classOf[CSVInputFormat],
+        classOf[NullWritable],
+        classOf[StringArrayWritable],
+        hadoopConfiguration).setName("global dictionary").map[Row] { currentRow =>
+          row.setValues(currentRow._2.get())
+      }
+      sqlContext.createDataFrame(rdd, schema)
   }
 
   // Hack for spark2 integration

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml
index 9d584b9..ad921c0 100644
--- a/integration/spark/pom.xml
+++ b/integration/spark/pom.xml
@@ -35,11 +35,6 @@
 
   <dependencies>
     <dependency>
-      <groupId>com.databricks</groupId>
-      <artifactId>spark-csv_${scala.binary.version}</artifactId>
-      <version>1.2.0</version>
-    </dependency>
-    <dependency>
       <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-common</artifactId>
       <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
index 0954374..ef85635 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
@@ -18,6 +18,7 @@
 package org.apache.carbondata.spark
 
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.compress.GzipCodec
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.command.LoadTable
 import org.apache.spark.sql.types._
@@ -103,17 +104,38 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) {
   }
 
   private def writeToTempCSVFile(tempCSVFolder: String, options: CarbonOption): Unit = {
-    var writer: DataFrameWriter =
-      dataFrame.write
-        .format(csvPackage)
-        .option("header", "false")
-        .mode(SaveMode.Overwrite)
 
-    if (options.compress) {
-      writer = writer.option("codec", "gzip")
+    val strRDD = dataFrame.rdd.mapPartitions { case iter =>
+      new Iterator[String] {
+        override def hasNext = iter.hasNext
+
+        def convertToCSVString(seq: Seq[Any]): String = {
+          val build = new java.lang.StringBuilder()
+          if (seq.head != null) {
+            build.append(seq.head.toString)
+          }
+          val itemIter = seq.tail.iterator
+          while (itemIter.hasNext) {
+            build.append(CarbonCommonConstants.COMMA)
+            val value = itemIter.next()
+            if (value != null) {
+              build.append(value.toString)
+            }
+          }
+          build.toString
+        }
+
+        override def next: String = {
+          convertToCSVString(iter.next.toSeq)
+        }
+      }
     }
 
-    writer.save(tempCSVFolder)
+    if (options.compress) {
+      strRDD.saveAsTextFile(tempCSVFolder, classOf[GzipCodec])
+    } else {
+      strRDD.saveAsTextFile(tempCSVFolder)
+    }
   }
 
   /**
@@ -134,8 +156,6 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) {
       Some(dataFrame)).run(cc)
   }
 
-  private def csvPackage: String = "com.databricks.spark.csv.newapi"
-
   private def convertToCarbonType(sparkType: DataType): String = {
     sparkType match {
       case StringType => CarbonType.STRING.getName

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 1f8af66..8504fc8 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -25,7 +25,6 @@ import scala.collection.mutable.ListBuffer
 import scala.util.Random
 import scala.util.control.Breaks._
 
-import com.databricks.spark.csv.newapi.CarbonTextFile
 import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.Job
@@ -551,7 +550,7 @@ object CarbonDataRDDFactory {
                org.apache.hadoop.io.compress.DefaultCodec,
                org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
 
-          CarbonTextFile.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration)
+          CommonUtil.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration)
 
           val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat
           inputFormat match {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index 20569df..2091598 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -21,11 +21,9 @@ import java.util
 import java.util.LinkedHashSet
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
 import scala.language.implicitConversions
 
 import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.SchemaRDD
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.AttributeReference

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/DefaultSourceTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/DefaultSourceTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/DefaultSourceTestCase.scala
deleted file mode 100644
index 20229bb..0000000
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/DefaultSourceTestCase.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.spark.testsuite.dataload
-
-import java.io.BufferedWriter
-import java.io.File
-import java.io.FileWriter
-import java.util.Random
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.spark.sql.common.util.CarbonHiveContext
-import org.apache.spark.sql.common.util.CarbonHiveContext.sql
-import org.apache.spark.sql.common.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-/**
- * Test Case for new defaultsource: com.databricks.spark.csv.newapi
- *
- * @date: Apr 10, 2016 10:34:58 PM
- * @See org.apache.carbondata.spark.util.GlobalDictionaryUtil
- */
-class DefaultSourceTestCase extends QueryTest with BeforeAndAfterAll {
-
-  var filePath: String = _
-
-  override def beforeAll {
-    buildTestData
-    buildTable
-  }
-
-  def buildTestData() = {
-    val workDirectory = new File(this.getClass.getResource("/").getPath + "/../../").getCanonicalPath
-    filePath = workDirectory + "/target/defaultsource.csv"
-    val file = new File(filePath)
-    val writer = new BufferedWriter(new FileWriter(file))
-    writer.write("c1,c2,c3,c4")
-    writer.newLine()
-    var i = 0
-    val random = new Random
-    for (i <- 0 until 2000000) {
-      writer.write("   aaaaaaa" + i + "  ,   " +
-        "bbbbbbb" + i % 1000 + "," +
-        i % 1000000 + "," + i % 10000 + "\n")
-    }
-    writer.close
-  }
-
-  def buildTable() = {
-    try {
-      sql("drop table if exists defaultsource")
-      sql("""create table if not exists defaultsource
-             (c1 string, c2 string, c3 int, c4 int)
-             STORED BY 'org.apache.carbondata.format'""")
-    } catch {
-      case ex: Throwable => LOGGER.error(ex.getMessage + "\r\n" + ex.getStackTraceString)
-    }
-  }
-
-  test("test new defaultsource: com.databricks.spark.csv.newapi") {
-    val df1 = CarbonHiveContext.read
-      .format("com.databricks.spark.csv.newapi")
-      .option("header", "true")
-      .option("delimiter", ",")
-      .option("parserLib", "univocity")
-      .option("ignoreLeadingWhiteSpace", "true")
-      .option("ignoreTrailingWhiteSpace", "true")
-      .load(filePath)
-
-    assert(!df1.first().getString(0).startsWith(" "))
-    assert(df1.count() == 2000000)
-    assert(df1.rdd.partitions.length == 3)
-  }
-
-  test("test defaultsource: com.databricks.spark.csv") {
-    val df2 = CarbonHiveContext.read
-      .format("com.databricks.spark.csv")
-      .option("header", "true")
-      .option("delimiter", ",")
-      .option("parserLib", "univocity")
-      .option("ignoreLeadingWhiteSpace", "true")
-      .option("ignoreTrailingWhiteSpace", "true")
-      .load(filePath)
-
-    assert(!df2.first().getString(0).startsWith(" "))
-    assert(df2.count() == 2000000)
-    assert(df2.rdd.partitions.length == 3)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
index fa8731a..c5bc058 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
@@ -56,6 +56,7 @@ class TestLoadDataWithNotProperInputFile extends QueryTest {
       GlobalDictionaryUtil.loadDataFrame(CarbonHiveContext, carbonLoadModel)
     } catch {
       case e: Throwable =>
+        e.printStackTrace()
         assert(e.getMessage.contains("Please check your input path and make sure " +
           "that files end with '.csv' and content is not empty"))
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 96c4a08..e147003 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -25,7 +25,6 @@ import scala.collection.mutable.ListBuffer
 import scala.util.Random
 import scala.util.control.Breaks._
 
-import com.databricks.spark.csv.newapi.CarbonTextFile
 import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.Job
@@ -547,7 +546,7 @@ object CarbonDataRDDFactory {
                org.apache.hadoop.io.compress.DefaultCodec,
                org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
 
-          CarbonTextFile.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration)
+          CommonUtil.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration)
 
           val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat
           inputFormat match {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e9093755/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c91f186..c1bfcbe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,7 +102,6 @@
 
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <spark.csv.version>1.2.0</spark.csv.version>
     <snappy.version>1.1.2.6</snappy.version>
     <hadoop.version>2.2.0</hadoop.version>
     <kettle.version>4.4.0-stable</kettle.version>


Mime
View raw message