carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jackylk <...@git.apache.org>
Subject [GitHub] carbondata pull request #1559: [CARBONDATA-1805][Dictionary] Optimize prunin...
Date Thu, 14 Dec 2017 07:52:37 GMT
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1559#discussion_r156873416
  
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
---
    @@ -348,36 +347,53 @@ object GlobalDictionaryUtil {
       }
     
       /**
    -   * load CSV files to DataFrame by using datasource "com.databricks.spark.csv"
    +   * load and prune dictionary Rdd from csv file or input dataframe
        *
    -   * @param sqlContext      SQLContext
    -   * @param carbonLoadModel carbon data load model
    +   * @param sqlContext sqlContext
    +   * @param carbonLoadModel carbonLoadModel
    +   * @param inputDF input dataframe
    +   * @param requiredCols names of dictionary column
    +   * @param hadoopConf hadoop configuration
    +   * @return rdd that contains only dictionary columns
        */
    -  def loadDataFrame(sqlContext: SQLContext,
    -      carbonLoadModel: CarbonLoadModel,
    -      hadoopConf: Configuration): DataFrame = {
    -    CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
    -    hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
    -    val columnNames = carbonLoadModel.getCsvHeaderColumns
    -    val schema = StructType(columnNames.map[StructField, Array[StructField]] { column
=>
    -      StructField(column, StringType)
    -    })
    -    val values = new Array[String](columnNames.length)
    -    val row = new StringArrayRow(values)
    -    val jobConf = new JobConf(hadoopConf)
    -    SparkHadoopUtil.get.addCredentials(jobConf)
    -    TokenCache.obtainTokensForNamenodes(jobConf.getCredentials,
    -      Array[Path](new Path(carbonLoadModel.getFactFilePath)),
    -      jobConf)
    -    val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
    -      sqlContext.sparkContext,
    -      classOf[CSVInputFormat],
    -      classOf[NullWritable],
    -      classOf[StringArrayWritable],
    -      jobConf).setName("global dictionary").map[Row] { currentRow =>
    -      row.setValues(currentRow._2.get())
    +  private def loadInputDataAsDictRdd(sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel,
    +      inputDF: Option[DataFrame], requiredCols: Array[String],
    +      hadoopConf: Configuration): RDD[Row] = {
    +    if (inputDF.isDefined) {
    +      inputDF.get.select(requiredCols.head, requiredCols.tail : _*).rdd
    +    } else {
    +      CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
    +      hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
    +      val headerCols = carbonLoadModel.getCsvHeaderColumns.map(_.toLowerCase)
    +      val header2Idx = headerCols.zipWithIndex.toMap
    +      // index of dictionary columns in header
    +      val dictColIdx = requiredCols.map(c => header2Idx(c.toLowerCase))
    +
    +      val jobConf = new JobConf(hadoopConf)
    +      SparkHadoopUtil.get.addCredentials(jobConf)
    +      TokenCache.obtainTokensForNamenodes(jobConf.getCredentials,
    +        Array[Path](new Path(carbonLoadModel.getFactFilePath)),
    +        jobConf)
    +      val dictRdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
    +        sqlContext.sparkContext,
    +        classOf[CSVInputFormat],
    +        classOf[NullWritable],
    +        classOf[StringArrayWritable],
    +        jobConf).setName("global dictionary").map[Row] { currentRow =>
    --- End diff --
    
    move setName and map to separate line


---

Mime
View raw message