carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jackylk <...@git.apache.org>
Subject [GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...
Date Wed, 27 Sep 2017 13:36:34 GMT
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r141344321
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
    @@ -217,22 +229,212 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
        * be put here.  For example, user defined output committer can be configured here
        * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
        */
    -  def prepareWrite(
    +  override def prepareWrite(
    +      sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
    +
    +    // Check if table with given path exists
    +    validateTable(options.get("path").get)
    +
    +    /* Check id streaming data schema matches with carbon table schema
    +     * Data from socket source does not have schema attached to it,
    +     * Following check is to ignore schema validation for socket source.
    +     */
    +    if (!(dataSchema.size.equals(1) &&
    +      dataSchema.fields(0).dataType.equals(StringType))) {
    +      val path = options.get("path")
    +      val tablePath: String = path match {
    +        case Some(value) => value
    +        case None => ""
    +      }
    +
    +      val carbonTableSchema: org.apache.carbondata.format.TableSchema =
    +        getTableSchema(sparkSession: SparkSession, tablePath: String)
    +      val isSchemaValid = validateSchema(carbonTableSchema, dataSchema)
    +
    +      if(!isSchemaValid) {
    +        LOGGER.error("Schema Validation Failed: streaming data schema"
    +          + "does not match with carbon table schema")
    +        throw new InvalidSchemaException("Schema Validation Failed : " +
    +          "streaming data schema does not match with carbon table schema")
    +      }
    +    }
    +    new CarbonStreamingOutputWriterFactory()
    +  }
    +
    +  /**
    +   * Read schema from existing carbon table
    +   * @param sparkSession
    +   * @param tablePath carbon table path
    +   * @return true if schema validation is successful else false
    +   */
    +  private def getTableSchema(
         sparkSession: SparkSession,
    -    job: Job,
    -    options: Map[String, String],
    -    dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory()
    +    tablePath: String): org.apache.carbondata.format.TableSchema = {
    +
    +    val formattedTablePath = tablePath.replace('\\', '/')
    +    val names = formattedTablePath.split("/")
    +    if (names.length < 3) {
    +      throw new IllegalArgumentException("invalid table path: " + tablePath)
    +    }
    +    val tableName : String = names(names.length - 1)
    +    val dbName : String = names(names.length - 2)
    +    val storePath = formattedTablePath.substring(0,
    +      formattedTablePath.lastIndexOf
    +      (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString)
    +        .concat(tableName)).toString) - 1)
    +
    +    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
    +    val thriftTableInfo: org.apache.carbondata.format.TableInfo =
    +      metastore.getThriftTableInfo(new CarbonTablePath(storePath, dbName, tableName))(sparkSession)
    +
    +    val factTable: org.apache.carbondata.format.TableSchema = thriftTableInfo.getFact_table
    +    factTable
    +  }
     
       /**
    +   * Validates streamed schema against existing table schema
    +   * @param carbonTableSchema existing carbon table schema
    +   * @param dataSchema streamed data schema
    +   * @return true if schema validation is successful else false
    +   */
    +  private def validateSchema(
    +      carbonTableSchema: org.apache.carbondata.format.TableSchema,
    +      dataSchema: StructType): Boolean = {
    +
    +    import scala.collection.mutable.ListBuffer
    +    val columnnSchemaValues = carbonTableSchema.getTable_columns.asScala.sortBy(_.schemaOrdinal)
    +
    +    var columnDataTypes = new ListBuffer[String]()
    +    for(columnDataType <- columnnSchemaValues) {
    +      columnDataTypes.append(columnDataType.data_type.toString)
    +    }
    +    val tableColumnDataTypeList = columnDataTypes.toList
    +
    +    var streamSchemaDataTypes = new ListBuffer[String]()
    +    for(i <- 0 until dataSchema.size) {
    +      streamSchemaDataTypes
    +        .append(
    +          mapStreamingDataTypeToString(dataSchema.fields(i).dataType.toString))
    +    }
    +    val streamedDataTypeList = streamSchemaDataTypes.toList
    +
    +    val isValid = tableColumnDataTypeList == streamedDataTypeList
    +    isValid
    +  }
    +
    +  /**
    +   * Parses streamed datatype according to carbon datatype
    +   * @param dataType
    +   * @return String
    +   */
    +  def mapStreamingDataTypeToString(dataType: String): String = {
    +    import org.apache.carbondata.format.DataType
    +    dataType match {
    +      case "IntegerType" => DataType.INT.toString
    +      case "StringType" => DataType.STRING.toString
    +      case "DateType" => DataType.DATE.toString
    +      case "DoubleType" => DataType.DOUBLE.toString
    +      case "FloatType" => DataType.DOUBLE.toString
    +      case "LongType" => DataType.LONG.toString
    +      case "ShortType" => DataType.SHORT.toString
    +      case "TimestampType" => DataType.TIMESTAMP.toString
    +    }
    +  }
    +
    +  /**
    +   * Validates if given table exists or throws exception
    +   * @param String existing carbon table path
    +   * @return None
    +   */
    +  private def validateTable(tablePath: String): Unit = {
    +
    +    val formattedTablePath = tablePath.replace('\\', '/')
    +    val names = formattedTablePath.split("/")
    +    if (names.length < 3) {
    +      throw new IllegalArgumentException("invalid table path: " + tablePath)
    +    }
    +    val tableName : String = names(names.length - 1)
    +    val dbName : String = names(names.length - 2)
    +    val storePath = formattedTablePath.substring(0,
    +      formattedTablePath.lastIndexOf
    +      (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString)
    +        .concat(tableName)).toString) - 1)
    +    val absoluteTableIdentifier: AbsoluteTableIdentifier =
    +      new AbsoluteTableIdentifier(storePath,
    +        new CarbonTableIdentifier(dbName, tableName,
    +          UUID.randomUUID().toString))
    +
    +    if (!checkIfTableExists(absoluteTableIdentifier)) {
    +      throw new NoSuchTableException(dbName, tableName)
    +    }
    +  }
    +
    +  /**
    +   * Checks if table exists by checking its schema file
    +   * @param absoluteTableIdentifier
    +   * @return Boolean
    +   */
    +  private def checkIfTableExists(absoluteTableIdentifier: AbsoluteTableIdentifier): Boolean
= {
    +    val carbonTablePath: CarbonTablePath = CarbonStorePath
    +      .getCarbonTablePath(absoluteTableIdentifier)
    +    val schemaFilePath: String = carbonTablePath.getSchemaFilePath
    +    FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) ||
    +      FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) ||
    +      FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)
    +  }
    +
    +  /**
    +   * If user wants to stream data from carbondata table source
    +   * and if following conditions are true:
    +   *    1. No schema provided by the user in readStream()
    +   *    2. spark.sql.streaming.schemaInference is set to true
    +   * carbondata can infer a table schema from a valid table path
    +   * The schema inference is not mandatory, but good have.
        * When possible, this method should return the schema of the given `files`.  When
the format
        * does not support inference, or no valid files are given should return None.  In
these cases
        * Spark will require that user specify the schema manually.
        */
    -  def inferSchema(
    -    sparkSession: SparkSession,
    -    options: Map[String, String],
    -    files: Seq[FileStatus]): Option[StructType] = Some(new StructType().add("value",
StringType))
    +  override def inferSchema(
    +                   sparkSession: SparkSession,
    --- End diff --
    
    correct the indentation


---

Mime
View raw message