carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipesala <...@git.apache.org>
Subject [GitHub] carbondata pull request #1672: [CARBONDATA-1858][PARTITION] Support querying...
Date Sat, 16 Dec 2017 17:46:12 GMT
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1672#discussion_r157347018
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
---
    @@ -345,23 +380,172 @@ case class CarbonLoadDataCommand(
         } else {
           (dataFrame, dataFrame)
         }
    -    if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) {
    +    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    if (!table.isChildDataMap) {
           GlobalDictionaryUtil.generateGlobalDictionary(
             sparkSession.sqlContext,
             carbonLoadModel,
             hadoopConf,
             dictionaryDataFrame)
         }
    -    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
    -      carbonLoadModel,
    -      columnar,
    -      partitionStatus,
    -      None,
    -      isOverwriteTable,
    -      hadoopConf,
    -      loadDataFrame,
    -      updateModel,
    -      operationContext)
    +    if (table.isStandardPartitionTable) {
    +      loadStandardPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame)
    +    } else {
    +      CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
    +        carbonLoadModel,
    +        columnar,
    +        partitionStatus,
    +        None,
    +        isOverwriteTable,
    +        hadoopConf,
    +        loadDataFrame,
    +        updateModel,
    +        operationContext)
    +    }
    +  }
    +
    +  private def loadStandardPartition(sparkSession: SparkSession,
    +      carbonLoadModel: CarbonLoadModel,
    +      hadoopConf: Configuration,
    +      dataFrame: Option[DataFrame]) = {
    +    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val logicalPlan =
    +      sparkSession.sessionState.catalog.lookupRelation(
    +        TableIdentifier(table.getTableName, Some(table.getDatabaseName)))
    +    val relation = logicalPlan.collect {
    +      case l: LogicalRelation => l
    +    }.head
    +
    +
    +    val query: LogicalPlan = if (dataFrame.isDefined) {
    +      var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
    +      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    +      var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
    +      val dateFormat = new SimpleDateFormat(dateFormatString)
    +      val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
    +      val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
    +      val serializationNullFormat =
    +      carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
    +      val attributes =
    +        StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes
    +      val len = attributes.length
    +      val rdd = dataFrame.get.rdd.map { f =>
    +        val data = new Array[Any](len)
    +        var i = 0
    +        while (i < len) {
    +          data(i) =
    +            UTF8String.fromString(
    +              CarbonScalaUtil.getString(f.get(i),
    +                serializationNullFormat,
    +                delimiterLevel1,
    +                delimiterLevel2,
    +                timeStampFormat,
    +                dateFormat))
    +          i = i + 1
    +        }
    +        InternalRow.fromSeq(data)
    +      }
    +      LogicalRDD(attributes, rdd)(sparkSession)
    +
    +    } else {
    +      var timeStampformatString = carbonLoadModel.getTimestampformat
    +      if (timeStampformatString.isEmpty) {
    +        timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
    +      }
    +      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    +      var dateFormatString = carbonLoadModel.getDateFormat
    +      if (dateFormatString.isEmpty) {
    +        dateFormatString = carbonLoadModel.getDefaultDateFormat
    +      }
    +      val dateFormat = new SimpleDateFormat(dateFormatString)
    +      // input data from csv files. Convert to logical plan
    +      CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
    +      hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
    +      val jobConf = new JobConf(hadoopConf)
    +      SparkHadoopUtil.get.addCredentials(jobConf)
    +      val attributes =
    +        StructType(carbonLoadModel.getCsvHeaderColumns.map(
    +          StructField(_, StringType))).toAttributes
    +      val rowDataTypes = attributes.map{f =>
    +        relation.output.find(_.name.equalsIgnoreCase(f.name)) match {
    +          case Some(attr) => attr.dataType
    +          case _ => StringType
    +        }
    +      }
    +      val len = rowDataTypes.length
    +      val rdd =
    +        new NewHadoopRDD[NullWritable, StringArrayWritable](
    +          sparkSession.sparkContext,
    +          classOf[CSVInputFormat],
    +          classOf[NullWritable],
    +          classOf[StringArrayWritable],
    +          jobConf).map{f =>
    +            val data = new Array[Any](len)
    +            var i = 0
    +            while (i < len) {
    +              // TODO find a way to avoid double conversion of date and time.
    +              data(i) = CarbonScalaUtil.getString(
    +                f._2.get()(i),
    +                rowDataTypes(i),
    +                timeStampFormat,
    +                dateFormat)
    +              i = i + 1
    +            }
    +            InternalRow.fromSeq(data)
    +        }
    +
    +      // Only select the required columns
    +      Project(relation.output.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get),
    +        LogicalRDD(attributes, rdd)(sparkSession))
    +    }
    +    Dataset.ofRows(sparkSession, InsertIntoTable(
    +      convertToLogicalRelation(relation, isOverwriteTable, carbonLoadModel, sparkSession),
    +      partition,
    +      query,
    +      OverwriteOptions(isOverwriteTable), false))
    +  }
    +
    +  private def convertToLogicalRelation(
    +      relation: LogicalRelation,
    +      overWrite: Boolean,
    +      loadModel: CarbonLoadModel,
    +      sparkSession: SparkSession): LogicalRelation = {
    +    val catalogTable = relation.catalogTable.get
    +    val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val metastoreSchema = StructType(StructType.fromAttributes(
    +      relation.output).fields.map(_.copy(dataType = StringType)))
    +    val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
    +    val catalog = new CatalogFileIndex(
    +      sparkSession, catalogTable, relation.relation.sizeInBytes)
    +    if (lazyPruningEnabled) {
    +      catalog
    +    } else {
    +      catalog.filterPartitions(Nil) // materialize all the partitions in memory
    +    }
    +    val partitionSchema =
    +      StructType(table.getPartitionInfo(table.getTableName).getColumnSchemaList.asScala.map(f
=>
    +      metastoreSchema.fields.find(_.name.equalsIgnoreCase(f.getColumnName))).map(_.get))
    --- End diff --
    
    ok


---

Mime
View raw message