carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Geetika Gupta (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (CARBONDATA-2003) Streaming table is not updated on second streaming load
Date Mon, 08 Jan 2018 12:43:00 GMT

     [ https://issues.apache.org/jira/browse/CARBONDATA-2003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Geetika Gupta updated CARBONDATA-2003:
--------------------------------------
    Description: 
I tried the following scenario on spark shell:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}

val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
"FORCE")

carbon.sql("CREATE TABLE uniqdata_stream_8(CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION
string, DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1
decimal(30,10), DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1
int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB',
'streaming'='true')")

import carbon.sqlContext.implicits._

val uniqdataSch = StructType(
Array(StructField("CUST_ID", IntegerType),StructField("CUST_NAME", StringType),StructField("ACTIVE_EMUI_VERSION",
StringType),StructField("DOB", TimestampType), StructField("DOJ", TimestampType), StructField("BIGINT_COLUMN1",
LongType), StructField("BIGINT_COLUMN2", LongType), StructField("DECIMAL_COLUMN1", org.apache.spark.sql.types.DecimalType(30,
10)), StructField("DECIMAL_COLUMN2", org.apache.spark.sql.types.DecimalType(36,10)), StructField("Double_COLUMN1",
DoubleType), StructField("Double_COLUMN2", DoubleType), StructField("INTEGER_COLUMN1", IntegerType)))

val streamDf = carbon.readStream
.schema(uniqdataSch)
.option("sep", ",")
.csv("file:///home/geetika/Downloads/uniqdata")

val dfToWrite = streamDf.map{x => x.get(0) + "," + x.get(1) + "," + x.get(2)+ "," + x.get(3)+
"," + x.get(4)+ "," + x.get(5)+ "," + x.get(6)+ "," + x.get(7)+ "," + x.get(8)+ "," + x.get(9)+
"," + x.get(10)+ "," + x.get(11)}

val qry = dfToWrite.writeStream.format("carbondata").trigger(ProcessingTime("5 seconds"))
             .option("checkpointLocation","/stream/uniq8")
            .option("dbName", "default")
            .option("tableName", "uniqdata_stream_8")
            .start()

          qry.awaitTermination()

Now close this shell and check the record count on the table using :

 import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._
val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")
carbon.sql("select count(*) from uniqdata_stream_8").show
OUTPUT:
scala> carbon.sql("select count(*) from uniqdata_stream_8").show
18/01/08 15:51:53 ERROR CarbonProperties: Executor task launch worker-0 Configured value for
property carbon.number.of.cores.while.loading is wrong. Falling back to the default value
2
+--------+
|count(1)|
+--------+
|    2013|
+--------+


Again try the above scenario and check the count. It remains same after the second streaming
load.


  was:
I tried the following scenario on spark shell:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}

val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")


carbon.sql("CREATE TABLE uniqdata_stream_8(CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION
string, DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1
decimal(30,10), DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1
int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB',
'streaming'='true')")

import carbon.sqlContext.implicits._

val uniqdataSch = StructType(
Array(StructField("CUST_ID", IntegerType),StructField("CUST_NAME", StringType),StructField("ACTIVE_EMUI_VERSION",
StringType),StructField("DOB", TimestampType), StructField("DOJ", TimestampType), StructField("BIGINT_COLUMN1",
LongType), StructField("BIGINT_COLUMN2", LongType), StructField("DECIMAL_COLUMN1", org.apache.spark.sql.types.DecimalType(30,
10)), StructField("DECIMAL_COLUMN2", org.apache.spark.sql.types.DecimalType(36,10)), StructField("Double_COLUMN1",
DoubleType), StructField("Double_COLUMN2", DoubleType), StructField("INTEGER_COLUMN1", IntegerType)))

val streamDf = carbon.readStream
.schema(uniqdataSch)
.option("sep", ",")
.csv("file:///home/geetika/Downloads/uniqdata")

val dfToWrite = streamDf.map{x => x.get(0) + "," + x.get(1) + "," + x.get(2)+ "," + x.get(3)+
"," + x.get(4)+ "," + x.get(5)+ "," + x.get(6)+ "," + x.get(7)+ "," + x.get(8)+ "," + x.get(9)+
"," + x.get(10)+ "," + x.get(11)}

val qry = dfToWrite.writeStream.format("carbondata").trigger(ProcessingTime("5 seconds"))
             .option("checkpointLocation","/stream/uniq8")
            .option("dbName", "default")
            .option("tableName", "uniqdata_stream_8")
            .start()

          qry.awaitTermination()

Now close this shell and check the record count on the table using :

 import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._
val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")
carbon.sql("select count(*) from uniqdata_stream_8").show
OUTPUT:
scala> carbon.sql("select count(*) from uniqdata_stream_8").show
18/01/08 15:51:53 ERROR CarbonProperties: Executor task launch worker-0 Configured value for
property carbon.number.of.cores.while.loading is wrong. Falling back to the default value
2
+--------+
|count(1)|
+--------+
|    2013|
+--------+


Again try the above scenario and check the count. It remains same after the second streaming
load.



> Streaming table is not updated on second streaming load
> -------------------------------------------------------
>
>                 Key: CARBONDATA-2003
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-2003
>             Project: CarbonData
>          Issue Type: Bug
>          Components: data-load
>    Affects Versions: 1.3.0
>         Environment: spark2.1
>            Reporter: Geetika Gupta
>             Fix For: 1.3.0
>
>         Attachments: 2000_UniqData.csv
>
>
> I tried the following scenario on spark shell:
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.CarbonSession._
> import org.apache.carbondata.core.util.CarbonProperties
> import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
> val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")
> import org.apache.carbondata.core.constants.CarbonCommonConstants
> import org.apache.carbondata.core.util.CarbonProperties
> CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
"FORCE")
> carbon.sql("CREATE TABLE uniqdata_stream_8(CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION
string, DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1
decimal(30,10), DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1
int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB',
'streaming'='true')")
> import carbon.sqlContext.implicits._
> val uniqdataSch = StructType(
> Array(StructField("CUST_ID", IntegerType),StructField("CUST_NAME", StringType),StructField("ACTIVE_EMUI_VERSION",
StringType),StructField("DOB", TimestampType), StructField("DOJ", TimestampType), StructField("BIGINT_COLUMN1",
LongType), StructField("BIGINT_COLUMN2", LongType), StructField("DECIMAL_COLUMN1", org.apache.spark.sql.types.DecimalType(30,
10)), StructField("DECIMAL_COLUMN2", org.apache.spark.sql.types.DecimalType(36,10)), StructField("Double_COLUMN1",
DoubleType), StructField("Double_COLUMN2", DoubleType), StructField("INTEGER_COLUMN1", IntegerType)))
> val streamDf = carbon.readStream
> .schema(uniqdataSch)
> .option("sep", ",")
> .csv("file:///home/geetika/Downloads/uniqdata")
> val dfToWrite = streamDf.map{x => x.get(0) + "," + x.get(1) + "," + x.get(2)+ ","
+ x.get(3)+ "," + x.get(4)+ "," + x.get(5)+ "," + x.get(6)+ "," + x.get(7)+ "," + x.get(8)+
"," + x.get(9)+ "," + x.get(10)+ "," + x.get(11)}
> val qry = dfToWrite.writeStream.format("carbondata").trigger(ProcessingTime("5 seconds"))
>              .option("checkpointLocation","/stream/uniq8")
>             .option("dbName", "default")
>             .option("tableName", "uniqdata_stream_8")
>             .start()
>           qry.awaitTermination()
> Now close this shell and check the record count on the table using :
>  import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.CarbonSession._
> val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")
> carbon.sql("select count(*) from uniqdata_stream_8").show
> OUTPUT:
> scala> carbon.sql("select count(*) from uniqdata_stream_8").show
> 18/01/08 15:51:53 ERROR CarbonProperties: Executor task launch worker-0 Configured value
for property carbon.number.of.cores.while.loading is wrong. Falling back to the default value
2
> +--------+
> |count(1)|
> +--------+
> |    2013|
> +--------+
> Again try the above scenario and check the count. It remains same after the second streaming
load.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message