carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aniketadnaik <...@git.apache.org>
Subject [GitHub] carbondata pull request #1352: [CARBONDATA-1174] Streaming Ingestion - schem...
Date Thu, 14 Sep 2017 07:13:22 GMT
Github user aniketadnaik commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1352#discussion_r138812980
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
    @@ -205,19 +220,188 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
      * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
      */
       def prepareWrite(
    -    sparkSession: SparkSession,
    -    job: Job,
    -    options: Map[String, String],
    -    dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory()
    +      sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
     
    -/**
    - * When possible, this method should return the schema of the given `files`.  When the
format
    - * does not support inference, or no valid files are given should return None.  In these
cases
    - * Spark will require that user specify the schema manually.
    - */
    +    // Check if table with given path exists
    +    validateTable(options.get("path").get)
    +
    +    // Check id streaming data schema matches with carbon table schema
    +    // Data from socket source does not have schema attached to it,
    +    // Following check is to ignore schema validation for socket source.
    +    if (!(dataSchema.size.equals(1) &&
    +      dataSchema.fields(0).dataType.equals(StringType))) {
    +      val tablePath = options.get("path")
    +      val path: String = tablePath match {
    +        case Some(value) => value
    +        case None => ""
    +      }
    +      val meta: CarbonMetastore = new CarbonMetastore(sparkSession.conf, path)
    +      val schemaPath = path + "/Metadata/schema"
    +      val schema: TableInfo = meta.readSchemaFile(schemaPath)
    +      val isSchemaValid = validateSchema(schema, dataSchema)
    +
    +      if(!isSchemaValid) {
    +        LOGGER.error("Schema Validation Failed: streaming data schema"
    +          + "does not match with carbon table schema")
    +        throw new InvalidSchemaException("Schema Validation Failed : " +
    +          "streaming data schema does not match with carbon table schema")
    +      }
    +    }
    +    new CarbonStreamingOutputWriterFactory()
    +  }
    +
    +  /**
    +   * Read schema from existing carbon table
    +   * @param sparkSession
    +   * @param tablePath carbon table path
    +   * @return true if schema validation is successful else false
    +   */
    +  private def getTableSchema(sparkSession: SparkSession, tablePath: String): TableInfo
= {
    +    val meta: CarbonMetastore = new CarbonMetastore(sparkSession.conf, tablePath)
    +    val schemaPath = tablePath + "/Metadata/schema"
    +    val schema: TableInfo = meta.readSchemaFile(schemaPath)
    +    schema
    +  }
    +
    +  /**
    +   * Validates streamed schema against existing table schema
    +   * @param schema existing carbon table schema
    +   * @param dataSchema streamed data schema
    +   * @return true if schema validation is successful else false
    +   */
    +  private def validateSchema(schema: TableInfo, dataSchema: StructType): Boolean = {
    +    val factTable: TableSchema = schema.getFact_table
    +
    +    import scala.collection.mutable.ListBuffer
    +    import scala.collection.JavaConverters._
    +    var columnnSchemaValues = factTable.getTable_columns.asScala.sortBy(_.schemaOrdinal)
    +
    +    var columnDataTypes = new ListBuffer[String]()
    +    for(columnDataType <- columnnSchemaValues) {
    +      columnDataTypes.append(columnDataType.data_type.toString)
    +    }
    +    val tableColumnDataTypeList = columnDataTypes.toList
    +
    +    var streamSchemaDataTypes = new ListBuffer[String]()
    +    for(i <- 0 until dataSchema.size) {
    +      streamSchemaDataTypes
    +        .append(
    +          mapStreamingDataTypeToString(dataSchema.fields(i).dataType.toString))
    +    }
    +    val streamedDataTypeList = streamSchemaDataTypes.toList
    +
    +    val isValid = tableColumnDataTypeList == streamedDataTypeList
    +    isValid
    +  }
    +
    +  /**
    +   * Parses streamed datatype according to carbon datatype
    +   * @param dataType
    +   * @return String
    +   */
    +  def mapStreamingDataTypeToString(dataType: String): String = {
    +    dataType match {
    +      case "IntegerType" => DataType.INT.toString
    +      case "StringType" => DataType.STRING.toString
    +      case "DateType" => DataType.DATE.toString
    +      case "DoubleType" => DataType.DOUBLE.toString
    +      case "FloatType" => DataType.DOUBLE.toString
    +      case "LongType" => DataType.LONG.toString
    +      case "ShortType" => DataType.SHORT.toString
    +      case "TimestampType" => DataType.TIMESTAMP.toString
    +    }
    +  }
    +
    +  /**
    +   * Validates if given table exists or throws exception
    +   * @param String existing carbon table path
    +   * @return None
    +   */
    +  private def validateTable(tablePath: String): Unit = {
    +
    +    val formattedTablePath = tablePath.replace('\\', '/')
    +    val names = formattedTablePath.split("/")
    +    if (names.length < 3) {
    +      throw new IllegalArgumentException("invalid table path: " + tablePath)
    +    }
    +    val tableName : String = names(names.length - 1)
    +    val dbName : String = names(names.length - 2)
    +    val storePath = formattedTablePath.substring(0,
    +      formattedTablePath.lastIndexOf
    +      (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString)
    +        .concat(tableName)).toString) - 1)
    +    val absoluteTableIdentifier: AbsoluteTableIdentifier =
    +      new AbsoluteTableIdentifier(storePath,
    +        new CarbonTableIdentifier(dbName, tableName,
    +          UUID.randomUUID().toString))
    +
    +    if (!checkIfTableExists(absoluteTableIdentifier)) {
    +      throw new NoSuchTableException(dbName, tableName)
    +    }
    +  }
    +
    +  /**
    +   * Checks if table exists by checking its schema file
    +   * @param absoluteTableIdentifier
    +   * @return Boolean
    +   */
    +  private def checkIfTableExists(absoluteTableIdentifier: AbsoluteTableIdentifier): Boolean
= {
    +    val carbonTablePath: CarbonTablePath = CarbonStorePath
    +      .getCarbonTablePath(absoluteTableIdentifier)
    +    val schemaFilePath: String = carbonTablePath.getSchemaFilePath
    +    FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) ||
    +      FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) ||
    +      FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)
    +  }
    +
    +  /**
    +   * If use wants to stream data from carbondata table source
    +   * and if following conditions are true:
    +   *    1. No schema provided by the user in readStream()
    +   *    2. spark.sql.streaming.schemaInference is set to true
    +   * carbondata can infer a table schema from a valid table path
    +   * The schema inference is not mandatory, but good have.
    +   * When possible, this method should return the schema of the given `files`.  When
the format
    +   * does not support inference, or no valid files are given should return None.  In
these cases
    +   * Spark will require that user specify the schema manually.
    +   */
       def inferSchema(
    --- End diff --
    
    This is used in read path. Its called from DatSource -> SourceSchema() -> getOrInferFileFormatSchema()
-> format.inferSchema() . We don't have read path support ready yet. But if user wants
to stream data from "carbondata" as a input source( readStream.format("carbondata")), it may
have to provide inferSchema() when spark.sql.streaming.InferSchema is set to true and no external
schema is provided. Again, this is not a mandatory functionality but good to have. 


---

Mime
View raw message