carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bhavya Aggarwal (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (CARBONDATA-1790) (Carbon1.3.0 - Streaming) Data load in Stream Segment fails if batch load is performed in between the streaming
Date Thu, 28 Dec 2017 07:41:00 GMT

    [ https://issues.apache.org/jira/browse/CARBONDATA-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305133#comment-16305133
] 

Bhavya Aggarwal commented on CARBONDATA-1790:
---------------------------------------------

This bug is an invalid bug as the steps give violates the current design, in current design
we can only have a single stream for a table if we are opening up sockets multiple time the
offset error will definitely come as sockets are not replayable. This is default spark behavior,
if you want to test it you can open a file stream and then try to do batch load in between.
I have written a small code to verify whether batch loads work with the stream and the code
is given below. You can keep moving different files into the directory from which you are
creating the stream in this case it is /home/bhavya/stream. Please note that you should move
the file to this folder after the stream is started. Both the streaming data and batch data
is consumed concurrently without any issue. 

import java.io.{File, PrintWriter}
import java.net.ServerSocket

import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, StreamingQuery}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{CarbonEnv, SparkSession}

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}

// scalastyle:off println
object StreamExample {
  def main(args: Array[String]) {

    // setup paths
    val rootPath = new File(this.getClass.getResource("/").getPath
                            + "../../../..").getCanonicalPath
    val storeLocation = s"$rootPath/examples/spark2/target/store"
    //
    // val storeLocation = s"hdfs://localhost:54311/stream"
    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
    val metastoredb = s"$rootPath/examples/spark2/target"
    val streamTableName = s"stream_table_2"

    CarbonProperties.getInstance()
      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")

    import org.apache.spark.sql.CarbonSession._
    val spark = SparkSession
      .builder()
      .master("local")
      .appName("StreamExample")
      .config("spark.sql.warehouse.dir", warehouse)
      .getOrCreateCarbonSession(storeLocation, metastoredb)

    spark.sparkContext.setLogLevel("ERROR")
   // drop table if exists previously
   spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
//Create table

 spark.sql(
          s"""
             | CREATE TABLE ${ streamTableName }(
             | id INT,
             | name STRING,
             | city STRING,
             | salary FLOAT
             | )
             | STORED BY 'carbondata'
             | TBLPROPERTIES(
             | 'streaming'='true', 'sort_columns'='name')
             | """.stripMargin)

  val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
      val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
      val thread1 = startStreaming(spark, tablePath)
      val thread3 = showTableCount(spark, streamTableName)
      Thread.sleep(15000)
      val path = s"/home/bhavya/carbonData/test.csv"
      spark.sql(
        s"""
           | LOAD DATA LOCAL INPATH '$path'
           | INTO TABLE $streamTableName
           | OPTIONS('HEADER'='true')
         """.stripMargin)
      Thread.sleep(15000)
      spark.sql(
        s"""
           | LOAD DATA LOCAL INPATH '$path'
           | INTO TABLE $streamTableName
           | OPTIONS('HEADER'='true')
         """.stripMargin)

      System.out.println("type enter to interrupt streaming")
      System.in.read()
      thread1.interrupt()
      thread3.interrupt()
    }

  spark.sql(s"select count(*) from ${ streamTableName }").show()
  spark.sql(s"select * from ${ streamTableName }").show(500, truncate = false)
 spark.stop()
 System.out.println("streaming finished")
}
 def showTableCount(spark: SparkSession, tableName: String): Thread = {
    val thread = new Thread() {
      override def run(): Unit = {
        for (_ <- 0 to 100) {
          spark.sql(s"select count(*) from $tableName").show(truncate = false)
          Thread.sleep(1000 * 3)
        }
      }
    }
    thread.start()
    thread
  }

  def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread = {
    val thread = new Thread() {
      override def run(): Unit = {
        var qry: StreamingQuery = null
        try {
         val userSchema = StructType(
            Array(StructField("id", StringType),
              StructField("name", StringType),
              StructField("city", StringType),
              StructField("salary", StringType)))
          val readSocketDF = spark.readStream
            .format("csv")
            .option("path", "/home/bhavya/stream")
            .option("sep",",")
            .schema(userSchema)
            .load()

        import spark.implicits._
          // Write data from socket stream to carbondata file
          qry = readSocketDF.map{x => x.get(0) + "," + x.get(1)  + "," + x.get(2)  + ","
+ x.get(3)  }.writeStream
            .format("carbondata")
            .trigger(ProcessingTime("1 seconds"))
            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
            .option("dbName", "default")
            .option("tableName", "stream_table_2")
            .start()
          qry.awaitTermination()
        } catch {
          case ex =>
            ex.printStackTrace()
            println("Done reading and writing streaming data")
        } finally {
          qry.stop()
        }
      }
    }
    thread.start()
    thread
  }


> (Carbon1.3.0 - Streaming) Data load in Stream Segment fails if batch load is performed
in between the streaming
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: CARBONDATA-1790
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-1790
>             Project: CarbonData
>          Issue Type: Bug
>          Components: data-query
>    Affects Versions: 1.3.0
>         Environment: 3 node ant cluster
>            Reporter: Ramakrishna S
>            Assignee: Bhavya Aggarwal
>              Labels: DFX
>
> Steps :
> 1. Create a streaming table and do a batch load
> 2. Set up the Streaming , so that it does streaming in chunk of 1000 records 20 times
> 3. Do another batch load on the table
> 4. Do one more time streaming
> +-------------+------------+--------------------------+--------------------------+--------------+------------+--+
> | Segment Id  |   Status   |     Load Start Time      |      Load End Time       | File
Format  | Merged To  |
> +-------------+------------+--------------------------+--------------------------+--------------+------------+--+
> | 2           | Success    | 2017-11-21 21:42:36.77   | 2017-11-21 21:42:40.396  | COLUMNAR_V3
 | NA         |
> | 1           | Streaming  | 2017-11-21 21:40:46.2    | NULL                     | ROW_V1
      | NA         |
> | 0           | Success    | 2017-11-21 21:40:39.782  | 2017-11-21 21:40:43.168  | COLUMNAR_V3
 | NA         |
> +-------------+------------+--------------------------+--------------------------+--------------+------------+--+
> *+Expected:+* Data should be loaded
> *+Actual+* : Data load fiails
> 1. One addition offset file is created(marked in bold)
> -rw-r--r--   2 root users         62 2017-11-21 21:40 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/0
> -rw-r--r--   2 root users         63 2017-11-21 21:40 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/1
> -rw-r--r--   2 root users         63 2017-11-21 21:42 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/10
> -rw-r--r--   2 root users         63 2017-11-21 21:40 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/2
> -rw-r--r--   2 root users         63 2017-11-21 21:41 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/3
> -rw-r--r--   2 root users         64 2017-11-21 21:41 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/4
> -rw-r--r--   2 root users         64 2017-11-21 21:41 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/5
> -rw-r--r--   2 root users         64 2017-11-21 21:41 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/6
> -rw-r--r--   2 root users         64 2017-11-21 21:41 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/7
> -rw-r--r--   2 root users         64 2017-11-21 21:41 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/8
> *-rw-r--r--   2 root users         63 2017-11-21 21:42 /user/hive/warehouse/Ram/default/stream_table5/.streaming/checkpoint/offsets/9*
> 2. Following error thrown:
> === Streaming Query ===
> Identifier: [id = 3a5334bc-d471-4676-b6ce-f21105d491d1, runId = b2be9f97-8141-46be-89db-9a0f98d13369]
> Current Offsets: {org.apache.spark.sql.execution.streaming.TextSocketSource@14c45193:
1000}
> Current State: ACTIVE
> Thread State: RUNNABLE
> Logical Plan:
> org.apache.spark.sql.execution.streaming.TextSocketSource@14c45193
>         at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:284)
>         at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177)
> Caused by: java.lang.RuntimeException: Offsets committed out of order: 20019 followed
by 1000
>         at scala.sys.package$.error(package.scala:27)
>         at org.apache.spark.sql.execution.streaming.TextSocketSource.commit(socket.scala:151)
>         at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$2$$anonfun$apply$mcV$sp$4.apply(StreamExecution.scala:421)
>         at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$2$$anonfun$apply$mcV$sp$4.apply(StreamExecution.scala:420)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
>         at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$2.apply$mcV$sp(StreamExecution.scala:420)
>         at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$2.apply(StreamExecution.scala:404)
>         at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$2.apply(StreamExecution.scala:404)
>         at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
>         at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
>         at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch(StreamExecution.scala:404)
>         at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:250)
>         at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
>         at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
>         at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
>         at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
>         at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244)
>         at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
>         at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239)
>         ... 1 more
> Done reading and writing streaming data
> Socket closed



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message