carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jackylk <...@git.apache.org>
Subject [GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...
Date Wed, 27 Sep 2017 13:33:50 GMT
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r141343541
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
    @@ -217,22 +229,212 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
        * be put here.  For example, user defined output committer can be configured here
        * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
        */
    -  def prepareWrite(
    +  override def prepareWrite(
    +      sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
    +
    +    // Check if table with given path exists
    +    validateTable(options.get("path").get)
    +
    +    /* Check id streaming data schema matches with carbon table schema
    +     * Data from socket source does not have schema attached to it,
    +     * Following check is to ignore schema validation for socket source.
    +     */
    +    if (!(dataSchema.size.equals(1) &&
    +      dataSchema.fields(0).dataType.equals(StringType))) {
    +      val path = options.get("path")
    +      val tablePath: String = path match {
    +        case Some(value) => value
    +        case None => ""
    +      }
    +
    +      val carbonTableSchema: org.apache.carbondata.format.TableSchema =
    +        getTableSchema(sparkSession: SparkSession, tablePath: String)
    +      val isSchemaValid = validateSchema(carbonTableSchema, dataSchema)
    +
    +      if(!isSchemaValid) {
    +        LOGGER.error("Schema Validation Failed: streaming data schema"
    +          + "does not match with carbon table schema")
    +        throw new InvalidSchemaException("Schema Validation Failed : " +
    +          "streaming data schema does not match with carbon table schema")
    +      }
    +    }
    +    new CarbonStreamingOutputWriterFactory()
    +  }
    +
    +  /**
    +   * Read schema from existing carbon table
    +   * @param sparkSession
    +   * @param tablePath carbon table path
    +   * @return true if schema validation is successful else false
    +   */
    +  private def getTableSchema(
         sparkSession: SparkSession,
    -    job: Job,
    -    options: Map[String, String],
    -    dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory()
    +    tablePath: String): org.apache.carbondata.format.TableSchema = {
    +
    +    val formattedTablePath = tablePath.replace('\\', '/')
    +    val names = formattedTablePath.split("/")
    +    if (names.length < 3) {
    +      throw new IllegalArgumentException("invalid table path: " + tablePath)
    +    }
    +    val tableName : String = names(names.length - 1)
    +    val dbName : String = names(names.length - 2)
    +    val storePath = formattedTablePath.substring(0,
    +      formattedTablePath.lastIndexOf
    +      (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString)
    --- End diff --
    
    remove unnecessary bracket and `toString` 


---

Mime
View raw message