carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [22/50] [abbrv] carbondata git commit: [CARBONDATA-2054]Add an example: how to use CarbonData batch load to integrate with Spark Streaming
Date Wed, 31 Jan 2018 05:22:42 GMT
[CARBONDATA-2054]Add an example: how to use CarbonData batch load to integrate with Spark Streaming

Use CarbonSession.createDataFrame to convert rdd to DataFrame in DStream.foreachRDD, and then
write batch data into CarbonData table which support auto compaction.

This closes #1840


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/09fb83ef
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/09fb83ef
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/09fb83ef

Branch: refs/heads/carbonstore
Commit: 09fb83ef0f9e57db9dcfcc68bec78849a91d9a7f
Parents: c55240d
Author: Zhang Zhichao <441586683@qq.com>
Authored: Sun Jan 21 16:36:50 2018 +0800
Committer: chenliang613 <chenliang613@huawei.com>
Committed: Fri Jan 26 21:29:33 2018 +0800

----------------------------------------------------------------------
 examples/spark2/pom.xml                         |   6 +
 .../CarbonBatchSparkStreamingExample.scala      | 216 +++++++++++++++++++
 .../CarbonStructuredStreamingExample.scala      | 215 ++++++++++++++++++
 .../carbondata/examples/ExampleUtils.scala      |  15 +-
 .../carbondata/examples/StreamExample.scala     | 215 ------------------
 .../streaming/segment/StreamSegment.java        |   2 +-
 6 files changed, 448 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/09fb83ef/examples/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/examples/spark2/pom.xml b/examples/spark2/pom.xml
index 227da7d..da39f1d 100644
--- a/examples/spark2/pom.xml
+++ b/examples/spark2/pom.xml
@@ -56,6 +56,12 @@
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-repl_${scala.binary.version}</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>${spark.deps.scope}</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09fb83ef/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
new file mode 100644
index 0000000..6ae87b9
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession}
+import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+
+/**
+ * This example introduces how to use CarbonData batch load to integrate
+ * with Spark Streaming(it's DStream, not Spark Structured Streaming)
+ */
+// scalastyle:off println
+
+case class DStreamData(id: Int, name: String, city: String, salary: Float)
+
+object CarbonBatchSparkStreamingExample {
+
+  def main(args: Array[String]): Unit = {
+
+    // setup paths
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val checkpointPath =
+      s"$rootPath/examples/spark2/target/spark_streaming_cp_" +
+      System.currentTimeMillis().toString()
+    val streamTableName = s"dstream_batch_table"
+
+    val spark = ExampleUtils.createCarbonSession("CarbonBatchSparkStreamingExample", 4)
+
+    val requireCreateTable = true
+
+    if (requireCreateTable) {
+      // drop table if exists previously
+      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+      // Create target carbon table and populate with initial data
+      // set AUTO_LOAD_MERGE to true to compact segment automatically
+      spark.sql(
+        s"""
+           | CREATE TABLE ${ streamTableName }(
+           | id INT,
+           | name STRING,
+           | city STRING,
+           | salary FLOAT
+           | )
+           | STORED BY 'carbondata'
+           | TBLPROPERTIES(
+           | 'sort_columns'='name',
+           | 'dictionary_include'='city',
+           | 'AUTO_LOAD_MERGE'='true',
+           | 'COMPACTION_LEVEL_THRESHOLD'='4,10')
+           | """.stripMargin)
+
+      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
+      val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+      // batch load
+      val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
+      spark.sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$path'
+           | INTO TABLE $streamTableName
+           | OPTIONS('HEADER'='true')
+         """.stripMargin)
+
+      // streaming ingest
+      val serverSocket = new ServerSocket(7071)
+      val thread1 = writeSocket(serverSocket)
+      val thread2 = showTableCount(spark, streamTableName)
+      val ssc = startStreaming(spark, streamTableName, tablePath, checkpointPath)
+      // wait for stop signal to stop Spark Streaming App
+      waitForStopSignal(ssc)
+      // it need to start Spark Streaming App in main thread
+      // otherwise it will encounter an not-serializable exception.
+      ssc.start()
+      ssc.awaitTermination()
+      thread1.interrupt()
+      thread2.interrupt()
+      serverSocket.close()
+    }
+
+    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
+
+    spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false)
+
+    // record(id = 100000001) comes from batch segment_0
+    // record(id = 1) comes from stream segment_1
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
+
+    // not filter
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id < 10 limit 100").show(100, truncate = false)
+
+    // show segments
+    spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false)
+
+    // drop table
+    spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+
+    spark.stop()
+    System.out.println("streaming finished")
+  }
+
+  def showTableCount(spark: SparkSession, tableName: String): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        for (_ <- 0 to 1000) {
+          spark.sql(s"select count(*) from $tableName").show(truncate = false)
+          spark.sql(s"SHOW SEGMENTS FOR TABLE ${tableName}").show(false)
+          Thread.sleep(1000 * 5)
+        }
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def waitForStopSignal(ssc: StreamingContext): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        // use command 'nc 127.0.0.1 7072' to stop Spark Streaming App
+        new ServerSocket(7072).accept()
+        // don't stop SparkContext here
+        ssc.stop(false, true)
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def startStreaming(spark: SparkSession, tableName: String,
+      tablePath: CarbonTablePath, checkpointPath: String): StreamingContext = {
+    var ssc: StreamingContext = null
+    try {
+      // recommend: the batch interval must set larger, such as 30s, 1min.
+      ssc = new StreamingContext(spark.sparkContext, Seconds(15))
+      ssc.checkpoint(checkpointPath)
+
+      val readSocketDF = ssc.socketTextStream("localhost", 7071)
+
+      val batchData = readSocketDF
+        .map(_.split(","))
+        .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat))
+
+      batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => {
+        val df = SparkSession.builder().getOrCreate()
+          .createDataFrame(rdd).toDF("id", "name", "city", "salary")
+        println("at time: " + time.toString() + " the count of received data: " + df.count())
+        df.write
+          .format("carbondata")
+          .option("tableName", tableName)
+          .option("tempCSV", "false")
+          .option("compress", "true")
+          .option("single_pass", "true")
+          .mode(SaveMode.Append)
+          .save()
+      }}
+    } catch {
+      case ex: Exception =>
+        ex.printStackTrace()
+        println("Done reading and writing streaming data")
+    }
+    ssc
+  }
+
+  def writeSocket(serverSocket: ServerSocket): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        // wait for client to connection request and accept
+        val clientSocket = serverSocket.accept()
+        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+        var index = 0
+        for (_ <- 1 to 1000) {
+          // write 5 records per iteration
+          for (_ <- 0 to 1000) {
+            index = index + 1
+            socketWriter.println(index.toString + ",name_" + index
+                                 + ",city_" + index + "," + (index * 10000.00).toString +
+                                 ",school_" + index + ":school_" + index + index + "$" +
index)
+          }
+          socketWriter.flush()
+          Thread.sleep(1000)
+        }
+        socketWriter.close()
+        System.out.println("Socket closed")
+      }
+    }
+    thread.start()
+    thread
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09fb83ef/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
new file mode 100644
index 0000000..247a59b
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+
+// scalastyle:off println
+object CarbonStructuredStreamingExample {
+  def main(args: Array[String]) {
+
+    // setup paths
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/examples/spark2/target/store"
+    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+    val metastoredb = s"$rootPath/examples/spark2/target"
+    val streamTableName = s"stream_table"
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+    import org.apache.spark.sql.CarbonSession._
+    val spark = SparkSession
+      .builder()
+      .master("local")
+      .appName("CarbonStructuredStreamingExample")
+      .config("spark.sql.warehouse.dir", warehouse)
+      .getOrCreateCarbonSession(storeLocation, metastoredb)
+
+    spark.sparkContext.setLogLevel("ERROR")
+
+    val requireCreateTable = true
+    val useComplexDataType = false
+
+    if (requireCreateTable) {
+      // drop table if exists previously
+      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+      // Create target carbon table and populate with initial data
+      if (useComplexDataType) {
+        spark.sql(
+          s"""
+             | CREATE TABLE ${ streamTableName }(
+             | id INT,
+             | name STRING,
+             | city STRING,
+             | salary FLOAT,
+             | file struct<school:array<string>, age:int>
+             | )
+             | STORED BY 'carbondata'
+             | TBLPROPERTIES(
+             | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')
+             | """.stripMargin)
+      } else {
+        spark.sql(
+          s"""
+             | CREATE TABLE ${ streamTableName }(
+             | id INT,
+             | name STRING,
+             | city STRING,
+             | salary FLOAT
+             | )
+             | STORED BY 'carbondata'
+             | TBLPROPERTIES(
+             | 'streaming'='true', 'sort_columns'='name')
+             | """.stripMargin)
+      }
+
+      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
+      val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+      // batch load
+      val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
+      spark.sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$path'
+           | INTO TABLE $streamTableName
+           | OPTIONS('HEADER'='true')
+         """.stripMargin)
+
+      // streaming ingest
+      val serverSocket = new ServerSocket(7071)
+      val thread1 = startStreaming(spark, tablePath)
+      val thread2 = writeSocket(serverSocket)
+      val thread3 = showTableCount(spark, streamTableName)
+
+      System.out.println("type enter to interrupt streaming")
+      System.in.read()
+      thread1.interrupt()
+      thread2.interrupt()
+      thread3.interrupt()
+      serverSocket.close()
+    }
+
+    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
+
+    spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false)
+
+    // record(id = 100000001) comes from batch segment_0
+    // record(id = 1) comes from stream segment_1
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
+
+    // not filter
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id < 10 limit 100").show(100, truncate = false)
+
+    if (useComplexDataType) {
+      // complex
+      spark.sql(s"select file.age, file.school " +
+                s"from ${ streamTableName } " +
+                s"where where file.age = 30 ").show(100, truncate = false)
+    }
+
+    spark.stop()
+    System.out.println("streaming finished")
+  }
+
+  def showTableCount(spark: SparkSession, tableName: String): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        for (_ <- 0 to 1000) {
+          spark.sql(s"select count(*) from $tableName").show(truncate = false)
+          Thread.sleep(1000 * 3)
+        }
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        var qry: StreamingQuery = null
+        try {
+          val readSocketDF = spark.readStream
+            .format("socket")
+            .option("host", "localhost")
+            .option("port", 7071)
+            .load()
+
+          // Write data from socket stream to carbondata file
+          qry = readSocketDF.writeStream
+            .format("carbondata")
+            .trigger(ProcessingTime("5 seconds"))
+            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+            .option("dbName", "default")
+            .option("tableName", "stream_table")
+            .start()
+
+          qry.awaitTermination()
+        } catch {
+          case ex: Exception =>
+            ex.printStackTrace()
+            println("Done reading and writing streaming data")
+        } finally {
+          qry.stop()
+        }
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def writeSocket(serverSocket: ServerSocket): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        // wait for client to connection request and accept
+        val clientSocket = serverSocket.accept()
+        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+        var index = 0
+        for (_ <- 1 to 1000) {
+          // write 5 records per iteration
+          for (_ <- 0 to 1000) {
+            index = index + 1
+            socketWriter.println(index.toString + ",name_" + index
+                                 + ",city_" + index + "," + (index * 10000.00).toString +
+                                 ",school_" + index + ":school_" + index + index + "$" +
index)
+          }
+          socketWriter.flush()
+          Thread.sleep(1000)
+        }
+        socketWriter.close()
+        System.out.println("Socket closed")
+      }
+    }
+    thread.start()
+    thread
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09fb83ef/examples/spark2/src/main/scala/org/apache/carbondata/examples/ExampleUtils.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/ExampleUtils.scala
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/ExampleUtils.scala
index c62a240..fee5e7c 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/ExampleUtils.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/ExampleUtils.scala
@@ -32,11 +32,12 @@ object ExampleUtils {
       .getCanonicalPath
   val storeLocation: String = currentPath + "/target/store"
 
-  def createCarbonSession(appName: String): SparkSession = {
+  def createCarbonSession(appName: String, workThreadNum: Int = 1): SparkSession = {
     val rootPath = new File(this.getClass.getResource("/").getPath
                             + "../../../..").getCanonicalPath
     val storeLocation = s"$rootPath/examples/spark2/target/store"
     val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+    val metastoredb = s"$rootPath/examples/spark2/target"
 
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss")
@@ -44,14 +45,18 @@ object ExampleUtils {
       .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING, "true")
       .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "")
 
+    val masterUrl = if (workThreadNum <= 1) {
+      "local"
+    } else {
+      "local[" + workThreadNum.toString() + "]"
+    }
     import org.apache.spark.sql.CarbonSession._
     val spark = SparkSession
       .builder()
-      .master("local")
-      .appName("CarbonSessionExample")
+      .master(masterUrl)
+      .appName(appName)
       .config("spark.sql.warehouse.dir", warehouse)
-      .config("spark.driver.host", "localhost")
-      .getOrCreateCarbonSession(storeLocation)
+      .getOrCreateCarbonSession(storeLocation, metastoredb)
     spark.sparkContext.setLogLevel("WARN")
     spark
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09fb83ef/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
deleted file mode 100644
index 5ef9d2a..0000000
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples
-
-import java.io.{File, PrintWriter}
-import java.net.ServerSocket
-
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
-
-// scalastyle:off println
-object StreamExample {
-  def main(args: Array[String]) {
-
-    // setup paths
-    val rootPath = new File(this.getClass.getResource("/").getPath
-                            + "../../../..").getCanonicalPath
-    val storeLocation = s"$rootPath/examples/spark2/target/store"
-    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
-    val metastoredb = s"$rootPath/examples/spark2/target"
-    val streamTableName = s"stream_table"
-
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
-
-    import org.apache.spark.sql.CarbonSession._
-    val spark = SparkSession
-      .builder()
-      .master("local")
-      .appName("StreamExample")
-      .config("spark.sql.warehouse.dir", warehouse)
-      .getOrCreateCarbonSession(storeLocation, metastoredb)
-
-    spark.sparkContext.setLogLevel("ERROR")
-
-    val requireCreateTable = true
-    val useComplexDataType = false
-
-    if (requireCreateTable) {
-      // drop table if exists previously
-      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
-      // Create target carbon table and populate with initial data
-      if (useComplexDataType) {
-        spark.sql(
-          s"""
-             | CREATE TABLE ${ streamTableName }(
-             | id INT,
-             | name STRING,
-             | city STRING,
-             | salary FLOAT,
-             | file struct<school:array<string>, age:int>
-             | )
-             | STORED BY 'carbondata'
-             | TBLPROPERTIES(
-             | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')
-             | """.stripMargin)
-      } else {
-        spark.sql(
-          s"""
-             | CREATE TABLE ${ streamTableName }(
-             | id INT,
-             | name STRING,
-             | city STRING,
-             | salary FLOAT
-             | )
-             | STORED BY 'carbondata'
-             | TBLPROPERTIES(
-             | 'streaming'='true', 'sort_columns'='name')
-             | """.stripMargin)
-      }
-
-      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
-      val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-      // batch load
-      val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
-      spark.sql(
-        s"""
-           | LOAD DATA LOCAL INPATH '$path'
-           | INTO TABLE $streamTableName
-           | OPTIONS('HEADER'='true')
-         """.stripMargin)
-
-      // streaming ingest
-      val serverSocket = new ServerSocket(7071)
-      val thread1 = startStreaming(spark, tablePath)
-      val thread2 = writeSocket(serverSocket)
-      val thread3 = showTableCount(spark, streamTableName)
-
-      System.out.println("type enter to interrupt streaming")
-      System.in.read()
-      thread1.interrupt()
-      thread2.interrupt()
-      thread3.interrupt()
-      serverSocket.close()
-    }
-
-    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
-
-    spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false)
-
-    // record(id = 100000001) comes from batch segment_0
-    // record(id = 1) comes from stream segment_1
-    spark.sql(s"select * " +
-              s"from ${ streamTableName } " +
-              s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
-
-    // not filter
-    spark.sql(s"select * " +
-              s"from ${ streamTableName } " +
-              s"where id < 10 limit 100").show(100, truncate = false)
-
-    if (useComplexDataType) {
-      // complex
-      spark.sql(s"select file.age, file.school " +
-                s"from ${ streamTableName } " +
-                s"where where file.age = 30 ").show(100, truncate = false)
-    }
-
-    spark.stop()
-    System.out.println("streaming finished")
-  }
-
-  def showTableCount(spark: SparkSession, tableName: String): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        for (_ <- 0 to 1000) {
-          spark.sql(s"select count(*) from $tableName").show(truncate = false)
-          Thread.sleep(1000 * 3)
-        }
-      }
-    }
-    thread.start()
-    thread
-  }
-
-  def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        var qry: StreamingQuery = null
-        try {
-          val readSocketDF = spark.readStream
-            .format("socket")
-            .option("host", "localhost")
-            .option("port", 7071)
-            .load()
-
-          // Write data from socket stream to carbondata file
-          qry = readSocketDF.writeStream
-            .format("carbondata")
-            .trigger(ProcessingTime("5 seconds"))
-            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
-            .option("dbName", "default")
-            .option("tableName", "stream_table")
-            .start()
-
-          qry.awaitTermination()
-        } catch {
-          case ex: Exception =>
-            ex.printStackTrace()
-            println("Done reading and writing streaming data")
-        } finally {
-          qry.stop()
-        }
-      }
-    }
-    thread.start()
-    thread
-  }
-
-  def writeSocket(serverSocket: ServerSocket): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        // wait for client to connection request and accept
-        val clientSocket = serverSocket.accept()
-        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
-        var index = 0
-        for (_ <- 1 to 1000) {
-          // write 5 records per iteration
-          for (_ <- 0 to 1000) {
-            index = index + 1
-            socketWriter.println(index.toString + ",name_" + index
-                                 + ",city_" + index + "," + (index * 10000.00).toString +
-                                 ",school_" + index + ":school_" + index + index + "$" +
index)
-          }
-          socketWriter.flush()
-          Thread.sleep(1000)
-        }
-        socketWriter.close()
-        System.out.println("Socket closed")
-      }
-    }
-    thread.start()
-    thread
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09fb83ef/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 4dbdfb0..7b823ac 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -361,7 +361,7 @@ public class StreamSegment {
   }
 
   /**
-   * update carbonindex file after after a stream batch.
+   * update carbonindex file after a stream batch.
    */
   public static void updateIndexFile(String segmentDir) throws IOException {
     FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);


Mime
View raw message