carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [50/50] [abbrv] carbondata git commit: [CARBONDATA-1174] Streaming Ingestion - schema validation and streaming examples
Date Tue, 10 Oct 2017 03:08:37 GMT
[CARBONDATA-1174] Streaming Ingestion - schema validation and streaming examples

1.schema validation of input data if its from a file source when schema is specified.
2.added streaming examples - for file stream and socket stream sources

This closes #1352


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/08a82b59
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/08a82b59
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/08a82b59

Branch: refs/heads/streaming_ingest
Commit: 08a82b59b409ca3e1881e9da7bc6d8a760b1d0d4
Parents: 441907e
Author: Aniket Adnaik <aniket.adnaik@gmail.com>
Authored: Thu Jun 15 11:57:43 2017 -0700
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Tue Oct 10 11:06:52 2017 +0800

----------------------------------------------------------------------
 .../streaming/CarbonStreamingCommitInfo.java    |   6 +-
 ...CarbonStreamingIngestFileSourceExample.scala | 146 +++++++++++++
 ...rbonStreamingIngestSocketSourceExample.scala | 160 ++++++++++++++
 .../examples/utils/StreamingExampleUtil.scala   | 145 +++++++++++++
 .../org/apache/spark/sql/CarbonSource.scala     | 210 +++++++++++++++++--
 .../CarbonStreamingOutpurWriteFactory.scala     |  88 --------
 .../CarbonStreamingOutputWriteFactory.scala     |  88 ++++++++
 .../CarbonSourceSchemaValidationTest.scala      |  61 ++++++
 8 files changed, 801 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/08a82b59/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
index 6cf303a..2027566 100644
--- a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
@@ -36,7 +36,7 @@ public class CarbonStreamingCommitInfo {
 
   private long batchID;
 
-  private String fileOffset;
+  private long fileOffset;
 
   private long transactionID;     // future use
 
@@ -67,6 +67,8 @@ public class CarbonStreamingCommitInfo {
     this.batchID = batchID;
 
     this.transactionID = -1;
+
+    this.fileOffset = 0;
   }
 
   public String getDataBase() {
@@ -93,7 +95,7 @@ public class CarbonStreamingCommitInfo {
     return batchID;
   }
 
-  public String getFileOffset() {
+  public long getFileOffset() {
     return fileOffset;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/08a82b59/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala
new file mode 100644
index 0000000..ebe0a5c
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.apache.spark.sql.streaming.ProcessingTime
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.examples.utils.{StreamingExampleUtil}
+
+/**
+ * Covers spark structured streaming scenario where user streams data
+ * from a file source (input source) and write into carbondata table(output sink).
+ * This example uses csv file as a input source and writes
+ * into target carbon table. The target carbon table must exist.
+ */
+
+object CarbonStreamingIngestFileSourceExample {
+
+  def main(args: Array[String]) {
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/examples/spark2/target/store"
+    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+    val metastoredb = s"$rootPath/examples/spark2/target"
+    val csvDataDir = s"$rootPath/examples/spark2/resources/csvDataDir"
+    val streamTableName = s"_carbon_file_stream_table_"
+    val streamTablePath = s"$storeLocation/default/$streamTableName"
+    val ckptLocation = s"$rootPath/examples/spark2/resources/ckptDir"
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+    // cleanup residual files, if any
+    StreamingExampleUtil.cleanUpDir(csvDataDir, ckptLocation)
+
+    import org.apache.spark.sql.CarbonSession._
+    val spark = SparkSession
+      .builder()
+      .master("local")
+      .appName("CarbonFileStreamingExample")
+      .config("spark.sql.warehouse.dir", warehouse)
+      .getOrCreateCarbonSession(storeLocation, metastoredb)
+
+    spark.sparkContext.setLogLevel("ERROR")
+
+    // Writes Dataframe to CarbonData file:
+    import spark.implicits._
+    import org.apache.spark.sql.types._
+    // drop table if exists previously
+    spark.sql(s"DROP TABLE IF EXISTS $streamTableName")
+    // Create target carbon table
+    spark.sql(
+      s"""
+         | CREATE TABLE $streamTableName(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT
+         | )
+         | STORED BY 'carbondata'""".stripMargin)
+
+    // Generate CSV data and write to CSV file
+    StreamingExampleUtil.generateCSVDataFile(spark, 1, csvDataDir, SaveMode.Overwrite)
+
+    // scalastyle:off
+    spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$csvDataDir'
+         | INTO TABLE $streamTableName
+         | OPTIONS('FILEHEADER'='id,name,city,salary'
+         | )""".stripMargin)
+    // scalastyle:on
+
+    // check initial table data
+    spark.sql(s""" SELECT * FROM $streamTableName """).show()
+
+    // define custom schema
+    val inputSchema = new StructType().
+      add("id", "integer").
+      add("name", "string").
+      add("city", "string").
+      add("salary", "float")
+
+    // setup csv file as a input streaming source
+    val csvReadDF = spark.readStream.
+      format("csv").
+      option("sep", ",").
+      schema(inputSchema).
+      option("path", csvDataDir).
+      option("header", "true").
+      load()
+
+    // Write data from csv format streaming source to carbondata target format
+    // set trigger to every 1 second
+    val qry = csvReadDF.writeStream
+      .format("carbondata")
+      .trigger(ProcessingTime("1 seconds"))
+      .option("checkpointLocation", ckptLocation)
+      .option("path", streamTablePath)
+      .start()
+
+    // In a separate thread append data every 2 seconds to existing csv
+    val gendataThread: Thread = new Thread() {
+      override def run(): Unit = {
+        for (i <- 1 to 5) {
+          Thread.sleep(2)
+          StreamingExampleUtil.
+            generateCSVDataFile(spark, i * 10 + 1, csvDataDir, SaveMode.Append)
+        }
+      }
+    }
+    gendataThread.start()
+    gendataThread.join()
+
+    // stop streaming execution after 5 sec delay
+    Thread.sleep(5000)
+    qry.stop()
+
+    // verify streaming data is added into the table
+    spark.sql(s""" SELECT * FROM $streamTableName """).show()
+
+    // Cleanup residual files and table data
+    StreamingExampleUtil.cleanUpDir(csvDataDir, ckptLocation)
+    spark.sql(s"DROP TABLE IF EXISTS $streamTableName")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/08a82b59/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestSocketSourceExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestSocketSourceExample.scala
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestSocketSourceExample.scala
new file mode 100644
index 0000000..bbf7ef2
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestSocketSourceExample.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.apache.spark.sql.streaming.ProcessingTime
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.examples.utils.StreamingExampleUtil
+
+
+/**
+ * This example reads stream data from socket source (input) and write into
+ * existing carbon table(output).
+ *
+ * It uses localhost and port (9999) to create a socket and write to it.
+ * Exmaples uses two threads one to write data to socket and other thread
+ * to receive data from socket and write into carbon table.
+ */
+
+// scalastyle:off println
+object CarbonStreamingIngestSocketSourceExample {
+
+  def main(args: Array[String]) {
+
+    // setup localhost and port number
+    val host = "localhost"
+    val port = 9999
+    // setup paths
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/examples/spark2/target/store"
+    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+    val metastoredb = s"$rootPath/examples/spark2/target"
+    val csvDataDir = s"$rootPath/examples/spark2/resources/csvDataDir"
+    val streamTableName = s"_carbon_socket_stream_table_"
+    val streamTablePath = s"$storeLocation/default/$streamTableName"
+    val ckptLocation = s"$rootPath/examples/spark2/resources/ckptDir"
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+    // cleanup residual files, if any
+    StreamingExampleUtil.cleanUpDir(csvDataDir, ckptLocation)
+
+    import org.apache.spark.sql.CarbonSession._
+    val spark = SparkSession
+      .builder()
+      .master("local[2]")
+      .appName("CarbonNetworkStreamingExample")
+      .config("spark.sql.warehouse.dir", warehouse)
+      .getOrCreateCarbonSession(storeLocation, metastoredb)
+
+    spark.sparkContext.setLogLevel("ERROR")
+
+    // Writes Dataframe to CarbonData file:
+    import spark.implicits._
+
+    // drop table if exists previously
+    spark.sql(s"DROP TABLE IF EXISTS ${streamTableName}")
+
+    // Create target carbon table and populate with initial data
+    spark.sql(
+      s"""
+         | CREATE TABLE ${streamTableName}(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT
+         | )
+         | STORED BY 'carbondata'""".stripMargin)
+
+    // Generate CSV data and write to CSV file
+    StreamingExampleUtil.generateCSVDataFile(spark, 1, csvDataDir, SaveMode.Overwrite)
+
+    // load the table
+    // scalastyle:off
+    spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$csvDataDir'
+         | INTO TABLE ${streamTableName}
+         | OPTIONS('FILEHEADER'='id,name,city,salary'
+         | )""".stripMargin)
+
+
+    spark.sql(s""" SELECT * FROM ${streamTableName} """).show()
+
+    // Create server socket in main thread
+    val serverSocket = StreamingExampleUtil.createserverSocket(host, port)
+
+    // Start client thread to receive streaming data and write into carbon
+    val streamWriterThread: Thread = new Thread() {
+      override def run(): Unit= {
+
+        try {
+          // Setup read stream to read input data from socket
+          val readSocketDF = spark.readStream
+            .format("socket")
+            .option("host", host)
+            .option("port", port)
+            .load()
+
+          // Write data from socket stream to carbondata file
+          val qry = readSocketDF.writeStream
+            .format("carbondata")
+            .trigger(ProcessingTime("2 seconds"))
+            .option("checkpointLocation", ckptLocation)
+            .option("path", streamTablePath)
+            .start()
+
+          qry.awaitTermination()
+        } catch {
+          case e: InterruptedException => println("Done reading and writing streaming
data")
+        }
+      }
+    }
+    streamWriterThread.start()
+
+    // wait for client to connection request and accept
+    val clientSocket = StreamingExampleUtil.waitToForClientConnection(serverSocket.get)
+
+    // Write to client's connected socket every 2 seconds, for 5 times
+    StreamingExampleUtil.writeToSocket(clientSocket, 5, 2, 11)
+
+    Thread.sleep(2000)
+    // interrupt client thread to stop streaming query
+    streamWriterThread.interrupt()
+    //wait for client thread to finish
+    streamWriterThread.join()
+
+    //Close the server socket
+    serverSocket.get.close()
+
+    // verify streaming data is added into the table
+    // spark.sql(s""" SELECT * FROM ${streamTableName} """).show()
+
+    // Cleanup residual files and table data
+    StreamingExampleUtil.cleanUpDir(csvDataDir, ckptLocation)
+    spark.sql(s"DROP TABLE IF EXISTS ${streamTableName}")
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/08a82b59/examples/spark2/src/main/scala/org/apache/carbondata/examples/utils/StreamingExampleUtil.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/utils/StreamingExampleUtil.scala
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/utils/StreamingExampleUtil.scala
new file mode 100644
index 0000000..6eab491
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/utils/StreamingExampleUtil.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples.utils
+
+import java.io.{IOException, PrintWriter}
+import java.net.{ServerSocket, Socket}
+
+import scala.tools.nsc.io.Path
+
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+
+
+/**
+ * Utility functions for streaming ingest examples
+ */
+
+// scalastyle:off println
+object StreamingExampleUtil {
+
+  // Clean up directories recursively, accepts variable arguments
+  def cleanUpDir(dirPaths: String*): Unit = {
+
+      // if (args.length < 1) {
+    if (dirPaths.size < 1) {
+      System.err.println("Usage: StreamingCleanupUtil <dirPath> [dirpath]...")
+      System.exit(1)
+    }
+
+    var i = 0
+    while (i < dirPaths.size) {
+      try {
+        val path: Path = Path(dirPaths(i))
+        path.deleteRecursively()
+      } catch {
+        case ioe: IOException => println("IO Exception while deleting files recursively"
+ ioe)
+      }
+      i = i + 1
+    }
+  }
+
+  // Generates csv data and write to csv files at given path
+  def generateCSVDataFile(spark: SparkSession,
+                        idStart: Int,
+                        csvDirPath: String,
+                        saveMode: SaveMode): Unit = {
+    // Create csv data frame file
+    val csvRDD = spark.sparkContext.parallelize(1 to 10)
+      .map(id => (id, "name_ABC", "city_XYZ", 10000.00*id))
+      val csvDataDF = spark.createDataFrame(csvRDD).toDF("id", "name", "city", "salary")
+
+
+    csvDataDF.write
+      .option("header", "false")
+      .mode(saveMode)
+      .csv(csvDirPath)
+  }
+
+  // Generates csv data frame and returns to caller
+  def generateCSVDataDF(spark: SparkSession,
+                        idStart: Int): DataFrame = {
+    // Create csv data frame file
+    val csvRDD = spark.sparkContext.parallelize(1 to 10)
+      .map(id => (id, "name_ABC", "city_XYZ", 10000.00*id))
+    val csvDataDF = spark.createDataFrame(csvRDD).toDF("id", "name", "city", "salary")
+    csvDataDF
+  }
+
+  // Create server socket for socket streaming source
+  def createserverSocket(host: String, port: Int): Option[ServerSocket] = {
+    try {
+      Some(new ServerSocket(port))
+    } catch {
+      case e: java.net.ConnectException =>
+        println("Error Connecting to" + host + ":" + port, e)
+        None
+    }
+  }
+
+  // Create server socket for socket streaming source
+  def waitToForClientConnection(serverSocket: ServerSocket): Socket = {
+    serverSocket.accept()
+  }
+
+  // Create server socket for socket streaming source
+  def closeServerSocket(serverSocket: ServerSocket): Unit = {
+    serverSocket.close()
+  }
+
+  // write periodically on given socket
+  def writeToSocket(clientSocket: Socket,
+                   iterations: Int,
+                   delay: Int,
+                   startID: Int): Unit = {
+
+    var nItr = 10
+    var nDelay = 5
+
+    // iterations range check
+    if (iterations >= 1 || iterations <= 50) {
+      nItr = iterations
+    } else {
+      println("Number of iterations exceeds limit. Setting to default 10 iterations")
+    }
+
+    // delay range check (1 second to 60 seconds)
+    if (delay >= 1 || delay <= 60) {
+      nDelay = delay
+    } else {
+      println("Delay exceeds the limit. Setting it to default 2 seconds")
+    }
+
+    val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+
+    var j = startID
+
+    for (i <- startID to startID + nItr) {
+      // write 5 records per iteration
+      for (id <- j to j + 5 ) {
+        socketWriter.println(id.toString + ", name_" + i
+          + ", city_" + i + ", " + (i*10000.00).toString)
+      }
+      j = j + 5
+      socketWriter.flush()
+      Thread.sleep(nDelay*1000)
+    }
+    socketWriter.close()
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/08a82b59/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index d496de2..6eacb19 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -17,14 +17,20 @@
 
 package org.apache.spark.sql
 
+import java.io.{BufferedWriter, FileWriter, IOException}
+import java.util.UUID
+
 import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.language.implicitConversions
 
 import org.apache.commons.lang.StringUtils
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapreduce.Job
+import org.apache.parquet.schema.InvalidSchemaException
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor}
 import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory}
 import org.apache.spark.sql.execution.strategy.CarbonLateDecodeStrategy
@@ -33,10 +39,12 @@ import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.CarbonStreamingOutputWriterFactory
-import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.sql.types._
 
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
@@ -44,7 +52,6 @@ import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
-
 /**
  * Carbon relation provider compliant to data source api.
  * Creates carbon relations
@@ -52,7 +59,11 @@ import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 class CarbonSource extends CreatableRelationProvider with RelationProvider
   with SchemaRelationProvider with DataSourceRegister with FileFormat  {
 
-  override def shortName(): String = "carbondata"
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  override def shortName(): String = {
+    "carbondata"
+  }
 
   // will be called if hive supported create table command is provided
   override def createRelation(sqlContext: SQLContext,
@@ -181,8 +192,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
 
   /**
    * Returns the path of the table
-   *
-     * @param sparkSession
+   * @param sparkSession
    * @param dbName
    * @param tableName
    * @return
@@ -217,22 +227,196 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
    * be put here.  For example, user defined output committer can be configured here
    * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
    */
-  def prepareWrite(
+  override def prepareWrite(
     sparkSession: SparkSession,
     job: Job,
     options: Map[String, String],
-    dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory()
+    dataSchema: StructType): OutputWriterFactory = {
+
+    // Check if table with given path exists
+    // validateTable(options.get("path").get)
+    validateTable(options("path"))
+
+    /* Check id streaming data schema matches with carbon table schema
+     * Data from socket source does not have schema attached to it,
+     * Following check is to ignore schema validation for socket source.
+     */
+    if (!(dataSchema.size.equals(1) &&
+      dataSchema.fields(0).dataType.equals(StringType))) {
+      val path = options.get("path")
+      val tablePath: String = path match {
+        case Some(value) => value
+        case None => ""
+      }
+
+      val carbonTableSchema: org.apache.carbondata.format.TableSchema =
+        getTableSchema(sparkSession: SparkSession, tablePath: String)
+      val isSchemaValid = validateSchema(carbonTableSchema, dataSchema)
+
+      if(!isSchemaValid) {
+        LOGGER.error("Schema Validation Failed: streaming data schema"
+          + "does not match with carbon table schema")
+        throw new InvalidSchemaException("Schema Validation Failed : " +
+          "streaming data schema does not match with carbon table schema")
+      }
+    }
+    new CarbonStreamingOutputWriterFactory()
+  }
 
   /**
-   * When possible, this method should return the schema of the given `files`.  When the
format
-   * does not support inference, or no valid files are given should return None.  In these
cases
-   * Spark will require that user specify the schema manually.
+   * Read schema from existing carbon table
+   * @param sparkSession
+   * @param tablePath carbon table path
+   * @return TableSchema read from provided table path
    */
-  def inferSchema(
+  private def getTableSchema(
+    sparkSession: SparkSession,
+    tablePath: String): org.apache.carbondata.format.TableSchema = {
+
+    val formattedTablePath = tablePath.replace('\\', '/')
+    val names = formattedTablePath.split("/")
+    if (names.length < 3) {
+      throw new IllegalArgumentException("invalid table path: " + tablePath)
+    }
+    val tableName : String = names(names.length - 1)
+    val dbName : String = names(names.length - 2)
+    val storePath = formattedTablePath.substring(0,
+      formattedTablePath.lastIndexOf
+      (dbName.concat(CarbonCommonConstants.FILE_SEPARATOR)
+        .concat(tableName)) - 1)
+    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val thriftTableInfo: org.apache.carbondata.format.TableInfo =
+      metastore.getThriftTableInfo(new CarbonTablePath(storePath, dbName, tableName))(sparkSession)
+
+    val factTable: org.apache.carbondata.format.TableSchema = thriftTableInfo.getFact_table
+    factTable
+  }
+
+  /**
+   * Validates streamed schema against existing table schema
+   * @param carbonTableSchema existing carbon table schema
+   * @param dataSchema streamed data schema
+   * @return true if schema validation is successful else false
+   */
+  private def validateSchema(
+      carbonTableSchema: org.apache.carbondata.format.TableSchema,
+      dataSchema: StructType): Boolean = {
+
+    val columnnSchemaValues = carbonTableSchema.getTable_columns
+      .asScala.sortBy(_.schemaOrdinal)
+
+    var columnDataTypes = new ListBuffer[String]()
+    columnnSchemaValues.foreach(columnDataType =>
+      columnDataTypes.append(columnDataType.data_type.toString))
+    val tableColumnDataTypeList = columnDataTypes.toList
+
+    var streamSchemaDataTypes = new ListBuffer[String]()
+    dataSchema.fields.foreach(item => streamSchemaDataTypes
+      .append(mapStreamingDataTypeToString(item.dataType.toString)))
+    val streamedDataTypeList = streamSchemaDataTypes.toList
+
+    val isValid = tableColumnDataTypeList == streamedDataTypeList
+    isValid
+  }
+
+  /**
+   * Maps streamed datatype to carbon datatype
+   * @param dataType
+   * @return String
+   */
+  def mapStreamingDataTypeToString(dataType: String): String = {
+    import org.apache.carbondata.format.DataType
+    dataType match {
+      case "IntegerType" => DataType.INT.toString
+      case "StringType" => DataType.STRING.toString
+      case "DateType" => DataType.DATE.toString
+      case "DoubleType" => DataType.DOUBLE.toString
+      case "FloatType" => DataType.DOUBLE.toString
+      case "LongType" => DataType.LONG.toString
+      case "ShortType" => DataType.SHORT.toString
+      case "TimestampType" => DataType.TIMESTAMP.toString
+    }
+  }
+
+  /**
+   * Validates if given table exists or throws exception
+   * @param tablePath existing carbon table path
+   * @return None
+   */
+  private def validateTable(tablePath: String): Unit = {
+
+    val formattedTablePath = tablePath.replace('\\', '/')
+    val names = formattedTablePath.split("/")
+    if (names.length < 3) {
+      throw new IllegalArgumentException("invalid table path: " + tablePath)
+    }
+    val tableName : String = names(names.length - 1)
+    val dbName : String = names(names.length - 2)
+    val storePath = formattedTablePath.substring(0,
+      formattedTablePath.lastIndexOf
+      (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString)
+        .concat(tableName)).toString) - 1)
+    val absoluteTableIdentifier: AbsoluteTableIdentifier =
+      new AbsoluteTableIdentifier(storePath,
+        new CarbonTableIdentifier(dbName, tableName,
+          UUID.randomUUID().toString))
+
+    if (!checkIfTableExists(absoluteTableIdentifier)) {
+      throw new NoSuchTableException(dbName, tableName)
+    }
+  }
+
+  /**
+   * Checks if table exists by checking its schema file
+   * @param absoluteTableIdentifier
+   * @return Boolean
+   */
+  private def checkIfTableExists(absoluteTableIdentifier: AbsoluteTableIdentifier): Boolean
= {
+    val carbonTablePath: CarbonTablePath = CarbonStorePath
+      .getCarbonTablePath(absoluteTableIdentifier)
+    val schemaFilePath: String = carbonTablePath.getSchemaFilePath
+    FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) ||
+      FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) ||
+      FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)
+  }
+
+  /**
+   * If user wants to stream data from carbondata table source
+   * and if following conditions are true:
+   *    1. No schema provided by the user in readStream()
+   *    2. spark.sql.streaming.schemaInference is set to true
+   * carbondata can infer table schema from a valid table path
+   * The schema inference is not mandatory, but good have.
+   * When possible, this method should return the schema of the given `files`.
+   * If the format does not support schema inference, or no valid files
+   * are given it should return None. In these cases Spark will require that
+   * user specify the schema manually.
+   */
+  override def inferSchema(
     sparkSession: SparkSession,
     options: Map[String, String],
-    files: Seq[FileStatus]): Option[StructType] = Some(new StructType().add("value", StringType))
+    files: Seq[FileStatus]): Option[StructType] = {
+
+    val path = options.get("path")
+    val tablePath: String = path match {
+      case Some(value) => value
+      case None => ""
+    }
+    // Check if table with given path exists
+    validateTable(tablePath)
+    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val carbonTableSchema: org.apache.carbondata.format.TableSchema =
+      getTableSchema(sparkSession: SparkSession, tablePath: String)
+    val columnnSchemaValues = carbonTableSchema.getTable_columns
+      .asScala.sortBy(_.schemaOrdinal)
+
+    var structFields = new ArrayBuffer[StructField]()
+    columnnSchemaValues.foreach(columnSchema =>
+      structFields.append(StructField(columnSchema.column_name,
+        CatalystSqlParser.parseDataType(columnSchema.data_type.toString), true)))
 
+    Some(new StructType(structFields.toArray))
+  }
 }
 
 object CarbonSource {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/08a82b59/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
deleted file mode 100644
index be69885..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.streaming
-
-
-import java.util.concurrent.ConcurrentHashMap
-
-import org.apache.hadoop.mapreduce.TaskAttemptContext
-import org.apache.spark.sql.execution.datasources.OutputWriterFactory
-import org.apache.spark.sql.types.StructType
-
-import org.apache.carbondata.core.util.path.CarbonTablePath
-
-
-class CarbonStreamingOutputWriterFactory extends OutputWriterFactory {
-
- /**
-  * When writing to a [[org.apache.spark.sql.execution.datasources.HadoopFsRelation]],
-  * this method gets called by each task on executor side
-  * to instantiate new [[org.apache.spark.sql.execution.datasources.OutputWriter]]s.
-  *
-  * @param path Path to write the file.
-  * @param dataSchema Schema of the rows to be written. Partition columns are not
-  *                   included in the schema if the relation being written is
-  *                   partitioned.
-  * @param context The Hadoop MapReduce task context.
-  */
-
-  override def newInstance(
-    path: String,
-
-    dataSchema: StructType,
-
-    context: TaskAttemptContext) : CarbonStreamingOutputWriter = {
-
-        new CarbonStreamingOutputWriter(path, context)
-  }
-
-  override def getFileExtension(context: TaskAttemptContext): String = {
-
-    CarbonTablePath.STREAM_FILE_NAME_EXT
-  }
-
-}
-
-object CarbonStreamingOutpurWriterFactory {
-
-  private[this] val writers = new ConcurrentHashMap[String, CarbonStreamingOutputWriter]()
-
-  def addWriter(path: String, writer: CarbonStreamingOutputWriter): Unit = {
-
-    if (writers.contains(path)) {
-      throw new IllegalArgumentException(path + "writer already exists")
-    }
-
-    writers.put(path, writer)
-  }
-
-  def getWriter(path: String): CarbonStreamingOutputWriter = {
-
-    writers.get(path)
-  }
-
-  def containsWriter(path: String): Boolean = {
-
-    writers.containsKey(path)
-  }
-
-  def removeWriter(path: String): Unit = {
-
-    writers.remove(path)
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/08a82b59/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriteFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriteFactory.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriteFactory.scala
new file mode 100644
index 0000000..c5e4226
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriteFactory.scala
@@ -0,0 +1,88 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.streaming
+
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.spark.sql.execution.datasources.OutputWriterFactory
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+
+class CarbonStreamingOutputWriterFactory extends OutputWriterFactory {
+
+ /**
+  * When writing to a [[org.apache.spark.sql.execution.datasources.HadoopFsRelation]],
+  * this method gets called by each task on executor side
+  * to instantiate new [[org.apache.spark.sql.execution.datasources.OutputWriter]]s.
+  *
+  * @param path Path to write the file.
+  * @param dataSchema Schema of the rows to be written. Partition columns are not
+  *                   included in the schema if the relation being written is
+  *                   partitioned.
+  * @param context The Hadoop MapReduce task context.
+  */
+
+  override def newInstance(
+    path: String,
+
+    dataSchema: StructType,
+
+    context: TaskAttemptContext) : CarbonStreamingOutputWriter = {
+
+        new CarbonStreamingOutputWriter(path, context)
+  }
+
+  override def getFileExtension(context: TaskAttemptContext): String = {
+
+    CarbonTablePath.STREAM_FILE_NAME_EXT
+  }
+
+}
+
+object CarbonStreamingOutputWriterFactory {
+
+  private[this] val writers = new ConcurrentHashMap[String, CarbonStreamingOutputWriter]()
+
+  def addWriter(path: String, writer: CarbonStreamingOutputWriter): Unit = {
+
+    if (writers.contains(path)) {
+      throw new IllegalArgumentException(path + "writer already exists")
+    }
+
+    writers.put(path, writer)
+  }
+
+  def getWriter(path: String): CarbonStreamingOutputWriter = {
+
+    writers.get(path)
+  }
+
+  def containsWriter(path: String): Boolean = {
+
+    writers.containsKey(path)
+  }
+
+  def removeWriter(path: String): Unit = {
+
+    writers.remove(path)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/08a82b59/integration/spark2/src/test/scala/org/apache/spark/carbondata/streaming/CarbonSourceSchemaValidationTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/streaming/CarbonSourceSchemaValidationTest.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/streaming/CarbonSourceSchemaValidationTest.scala
new file mode 100644
index 0000000..f00eea5
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/streaming/CarbonSourceSchemaValidationTest.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata.streaming
+
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.sql.common.util.Spark2QueryTest
+import org.apache.spark.sql.{CarbonSource, SparkSession}
+import org.apache.spark.sql.streaming.CarbonStreamingOutputWriterFactory
+import org.apache.spark.sql.test.TestQueryExecutor
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
+import org.scalatest.BeforeAndAfterAll
+
+/**
+ * Test for schema validation during streaming ingestion
+ * Validates streamed schema(source) against existing table(target) schema.
+ */
+
+class CarbonSourceSchemaValidationTest extends Spark2QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll() {
+    sql("DROP TABLE IF EXISTS _carbon_stream_table_")
+  }
+
+  test("Testing validate schema method with correct values ") {
+
+    val spark = SparkSession.builder
+      .appName("StreamIngestSchemaValidation")
+      .master("local")
+      .getOrCreate()
+
+    val carbonSource = new CarbonSource
+    val job = new Job()
+    val warehouseLocation = TestQueryExecutor.warehouse
+
+    sql("CREATE TABLE _carbon_stream_table_(id int,name string)STORED BY 'carbondata'")
+    val tablePath: String = s"$warehouseLocation/default/_carbon_stream_table_"
+    val dataSchema = StructType(Array(StructField("id", IntegerType, true), StructField("name",
StringType, true)))
+    val res = carbonSource.prepareWrite(spark, job, Map("path" -> tablePath), dataSchema)
+    assert(res.isInstanceOf[CarbonStreamingOutputWriterFactory])
+  }
+
+  override def afterAll() {
+    sql("DROP TABLE IF EXISTS _carbon_stream_table_")
+  }
+
+}


Mime
View raw message