carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (CARBONDATA-2) Remove kettle for loading data
Date Sat, 05 Nov 2016 13:48:58 GMT

    [ https://issues.apache.org/jira/browse/CARBONDATA-2?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15639686#comment-15639686
] 

ASF GitHub Bot commented on CARBONDATA-2:
-----------------------------------------

Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/263#discussion_r86665514
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
---
    @@ -0,0 +1,281 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.spark.rdd
    +
    +import java.lang.Long
    +import java.text.SimpleDateFormat
    +import java.util
    +import java.util.{Date, UUID}
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.io.NullWritable
    +import org.apache.hadoop.mapreduce.RecordReader
    +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
    +import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil, CarbonSerializableConfiguration}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.execution.command.Partitioner
    +import org.apache.spark.util.SerializableConfiguration
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.common.logging.impl.StandardLogService
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
    +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
    +import org.apache.carbondata.hadoop.csv.CSVInputFormat
    +import org.apache.carbondata.hadoop.io.StringArrayWritable
    +import org.apache.carbondata.processing.graphgenerator.GraphGenerator
    +import org.apache.carbondata.spark.DataLoadResult
    +import org.apache.carbondata.spark.load._
    +import org.apache.carbondata.spark.splits.TableSplit
    +import org.apache.carbondata.spark.util.CarbonQueryUtil
    +
    +/**
    + * It loads the data to carbon using @AbstractDataLoadProcessorStep
    + */
    +class NewCarbonDataLoadRDD[K, V](
    +    sc: SparkContext,
    +    result: DataLoadResult[K, V],
    +    carbonLoadModel: CarbonLoadModel,
    +    var storeLocation: String,
    +    hdfsStoreLocation: String,
    +    kettleHomePath: String,
    +    partitioner: Partitioner,
    +    columinar: Boolean,
    +    loadCount: Integer,
    +    tableCreationTime: Long,
    +    schemaLastUpdatedTime: Long,
    +    blocksGroupBy: Array[(String, Array[BlockDetails])],
    +    isTableSplitPartition: Boolean)
    +  extends RDD[(K, V)](sc, Nil) with CarbonHadoopMapReduceUtil with Logging {
    +
    +  sc.setLocalProperty("spark.scheduler.pool", "DDL")
    +
    +  private val jobTrackerId: String = {
    +    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
    +    formatter.format(new Date())
    +  }
    +
    +  // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
    +  private val confBroadcast =
    +    sc.broadcast(new CarbonSerializableConfiguration(sc.hadoopConfiguration))
    +
    +  override def getPartitions: Array[Partition] = {
    +    if (isTableSplitPartition) {
    +      // for table split partition
    +      var splits = Array[TableSplit]()
    +
    +      if (carbonLoadModel.isDirectLoad) {
    +        splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
    +          partitioner.nodeList, partitioner.partitionCount)
    +      }
    +      else {
    +        splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
    +          carbonLoadModel.getTableName, null, partitioner)
    +      }
    +
    +      splits.zipWithIndex.map { s =>
    +        // filter the same partition unique id, because only one will match, so get 0
element
    +        val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p =>
    +          p._1 == s._1.getPartition.getUniqueID)(0)._2
    +        new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails)
    +      }
    +    } else {
    +      // for node partition
    +      blocksGroupBy.zipWithIndex.map { b =>
    +        new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
    +      }
    +    }
    +  }
    +
    +  override def checkpoint() {
    +    // Do nothing. Hadoop RDD should not be checkpointed.
    +  }
    +
    +  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] =
{
    +    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
    +    val iter = new Iterator[(K, V)] {
    +      var partitionID = "0"
    +      val loadMetadataDetails = new LoadMetadataDetails()
    +      var model: CarbonLoadModel = _
    +      var uniqueLoadStatusId =
    +        carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
    +      try {
    +        loadMetadataDetails.setPartitionCount(partitionID)
    +        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
    +
    +        carbonLoadModel.setSegmentId(String.valueOf(loadCount))
    +        val recordReaders = getRecordReaders
    +        val loader = new SparkPartitionLoader(model,
    +          theSplit.index,
    +          hdfsStoreLocation,
    +          kettleHomePath,
    +          loadCount,
    +          loadMetadataDetails)
    +        // Intialize to set carbon properties
    +        loader.initialize()
    +        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
    +        CarbonLoaderUtil.executeNewDataLoad(model,
    +          loader.storeLocation,
    +          hdfsStoreLocation,
    +          recordReaders)
    +      } catch {
    +        case e: Exception =>
    +          logInfo("DataLoad failure")
    +          LOGGER.error(e)
    +          throw e
    +      }
    +
    +      def getRecordReaders: Array[RecordReader[NullWritable, StringArrayWritable]] =
{
    +        val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, theSplit.index,
0)
    +        val configuration: Configuration = confBroadcast.value.value
    +        configureCSVInputFormat(configuration)
    +        val hadoopAttemptContext = newTaskAttemptContext(configuration, attemptId)
    +        val format = new CSVInputFormat
    +        if (isTableSplitPartition) {
    +          // for table split partition
    +          val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
    +          logInfo("Input split: " + split.serializableHadoopSplit.value)
    +          carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
    +          if (carbonLoadModel.isDirectLoad) {
    +            model = carbonLoadModel.getCopyWithPartition(
    +                split.serializableHadoopSplit.value.getPartition.getUniqueID,
    +                split.serializableHadoopSplit.value.getPartition.getFilesPath,
    +                carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
    +          } else {
    +            model = carbonLoadModel.getCopyWithPartition(
    +                split.serializableHadoopSplit.value.getPartition.getUniqueID)
    +          }
    +          partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
    +
    +          StandardLogService.setThreadName(partitionID, null)
    +          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap(
    +              partitionID, split.partitionBlocksDetail.length)
    +          val readers =
    +          split.partitionBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
    +          readers.zipWithIndex
    +            .foreach(f => f._1.initialize(split.partitionBlocksDetail(f._2), hadoopAttemptContext))
    --- End diff --
    
    change `f` to `case (reader, index)` to make it more readable
    move foreach to previous line


> Remove kettle for loading data
> ------------------------------
>
>                 Key: CARBONDATA-2
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-2
>             Project: CarbonData
>          Issue Type: Improvement
>          Components: data-load
>            Reporter: Liang Chen
>            Priority: Critical
>              Labels: features
>             Fix For: 0.3.0-incubating
>
>         Attachments: CarbonDataLoadingdesign.pdf
>
>
> Remove kettle for loading data module



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message