From issues-return-30306-archive-asf-public=cust-asf.ponee.io@carbondata.apache.org Mon Jan 8 13:43:06 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 1B21918072F for ; Mon, 8 Jan 2018 13:43:06 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 0ABEA160C1E; Mon, 8 Jan 2018 12:43:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 29124160C2C for ; Mon, 8 Jan 2018 13:43:05 +0100 (CET) Received: (qmail 43263 invoked by uid 500); 8 Jan 2018 12:43:04 -0000 Mailing-List: contact issues-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list issues@carbondata.apache.org Received: (qmail 43254 invoked by uid 99); 8 Jan 2018 12:43:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Jan 2018 12:43:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id EC47E1A0A65 for ; Mon, 8 Jan 2018 12:43:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -100.711 X-Spam-Level: X-Spam-Status: No, score=-100.711 tagged_above=-999 required=6.31 tests=[RCVD_IN_DNSWL_LOW=-0.7, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id hxtXcvXwABt5 for ; Mon, 8 Jan 2018 12:43:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 769B05F24E for ; Mon, 8 Jan 2018 12:43:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 6CBF9E0352 for ; Mon, 8 Jan 2018 12:43:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 2EABE240EA for ; Mon, 8 Jan 2018 12:43:00 +0000 (UTC) Date: Mon, 8 Jan 2018 12:43:00 +0000 (UTC) From: "Geetika Gupta (JIRA)" To: issues@carbondata.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (CARBONDATA-2003) Streaming table is not updated on second streaming load MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/CARBONDATA-2003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Geetika Gupta updated CARBONDATA-2003: -------------------------------------- Description: I tried the following scenario on spark shell: import org.apache.spark.sql.SparkSession import org.apache.spark.sql.CarbonSession._ import org.apache.carbondata.core.util.CarbonProperties import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp") import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE") carbon.sql("CREATE TABLE uniqdata_stream_8(CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string, DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10), DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB', 'streaming'='true')") import carbon.sqlContext.implicits._ val uniqdataSch = StructType( Array(StructField("CUST_ID", IntegerType),StructField("CUST_NAME", StringType),StructField("ACTIVE_EMUI_VERSION", StringType),StructField("DOB", TimestampType), StructField("DOJ", TimestampType), StructField("BIGINT_COLUMN1", LongType), StructField("BIGINT_COLUMN2", LongType), StructField("DECIMAL_COLUMN1", org.apache.spark.sql.types.DecimalType(30, 10)), StructField("DECIMAL_COLUMN2", org.apache.spark.sql.types.DecimalType(36,10)), StructField("Double_COLUMN1", DoubleType), StructField("Double_COLUMN2", DoubleType), StructField("INTEGER_COLUMN1", IntegerType))) val streamDf = carbon.readStream .schema(uniqdataSch) .option("sep", ",") .csv("file:///home/geetika/Downloads/uniqdata") val dfToWrite = streamDf.map{x => x.get(0) + "," + x.get(1) + "," + x.get(2)+ "," + x.get(3)+ "," + x.get(4)+ "," + x.get(5)+ "," + x.get(6)+ "," + x.get(7)+ "," + x.get(8)+ "," + x.get(9)+ "," + x.get(10)+ "," + x.get(11)} val qry = dfToWrite.writeStream.format("carbondata").trigger(ProcessingTime("5 seconds")) .option("checkpointLocation","/stream/uniq8") .option("dbName", "default") .option("tableName", "uniqdata_stream_8") .start() qry.awaitTermination() Now close this shell and check the record count on the table using : import org.apache.spark.sql.SparkSession import org.apache.spark.sql.CarbonSession._ val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp") carbon.sql("select count(*) from uniqdata_stream_8").show OUTPUT: scala> carbon.sql("select count(*) from uniqdata_stream_8").show 18/01/08 15:51:53 ERROR CarbonProperties: Executor task launch worker-0 Configured value for property carbon.number.of.cores.while.loading is wrong. Falling back to the default value 2 +--------+ |count(1)| +--------+ | 2013| +--------+ Again try the above scenario and check the count. It remains same after the second streaming load. was: I tried the following scenario on spark shell: import org.apache.spark.sql.SparkSession import org.apache.spark.sql.CarbonSession._ import org.apache.carbondata.core.util.CarbonProperties import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp") carbon.sql("CREATE TABLE uniqdata_stream_8(CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string, DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10), DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB', 'streaming'='true')") import carbon.sqlContext.implicits._ val uniqdataSch = StructType( Array(StructField("CUST_ID", IntegerType),StructField("CUST_NAME", StringType),StructField("ACTIVE_EMUI_VERSION", StringType),StructField("DOB", TimestampType), StructField("DOJ", TimestampType), StructField("BIGINT_COLUMN1", LongType), StructField("BIGINT_COLUMN2", LongType), StructField("DECIMAL_COLUMN1", org.apache.spark.sql.types.DecimalType(30, 10)), StructField("DECIMAL_COLUMN2", org.apache.spark.sql.types.DecimalType(36,10)), StructField("Double_COLUMN1", DoubleType), StructField("Double_COLUMN2", DoubleType), StructField("INTEGER_COLUMN1", IntegerType))) val streamDf = carbon.readStream .schema(uniqdataSch) .option("sep", ",") .csv("file:///home/geetika/Downloads/uniqdata") val dfToWrite = streamDf.map{x => x.get(0) + "," + x.get(1) + "," + x.get(2)+ "," + x.get(3)+ "," + x.get(4)+ "," + x.get(5)+ "," + x.get(6)+ "," + x.get(7)+ "," + x.get(8)+ "," + x.get(9)+ "," + x.get(10)+ "," + x.get(11)} val qry = dfToWrite.writeStream.format("carbondata").trigger(ProcessingTime("5 seconds")) .option("checkpointLocation","/stream/uniq8") .option("dbName", "default") .option("tableName", "uniqdata_stream_8") .start() qry.awaitTermination() Now close this shell and check the record count on the table using : import org.apache.spark.sql.SparkSession import org.apache.spark.sql.CarbonSession._ val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp") carbon.sql("select count(*) from uniqdata_stream_8").show OUTPUT: scala> carbon.sql("select count(*) from uniqdata_stream_8").show 18/01/08 15:51:53 ERROR CarbonProperties: Executor task launch worker-0 Configured value for property carbon.number.of.cores.while.loading is wrong. Falling back to the default value 2 +--------+ |count(1)| +--------+ | 2013| +--------+ Again try the above scenario and check the count. It remains same after the second streaming load. > Streaming table is not updated on second streaming load > ------------------------------------------------------- > > Key: CARBONDATA-2003 > URL: https://issues.apache.org/jira/browse/CARBONDATA-2003 > Project: CarbonData > Issue Type: Bug > Components: data-load > Affects Versions: 1.3.0 > Environment: spark2.1 > Reporter: Geetika Gupta > Fix For: 1.3.0 > > Attachments: 2000_UniqData.csv > > > I tried the following scenario on spark shell: > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.CarbonSession._ > import org.apache.carbondata.core.util.CarbonProperties > import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} > val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp") > import org.apache.carbondata.core.constants.CarbonCommonConstants > import org.apache.carbondata.core.util.CarbonProperties > CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE") > carbon.sql("CREATE TABLE uniqdata_stream_8(CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string, DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10), DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= '256 MB', 'streaming'='true')") > import carbon.sqlContext.implicits._ > val uniqdataSch = StructType( > Array(StructField("CUST_ID", IntegerType),StructField("CUST_NAME", StringType),StructField("ACTIVE_EMUI_VERSION", StringType),StructField("DOB", TimestampType), StructField("DOJ", TimestampType), StructField("BIGINT_COLUMN1", LongType), StructField("BIGINT_COLUMN2", LongType), StructField("DECIMAL_COLUMN1", org.apache.spark.sql.types.DecimalType(30, 10)), StructField("DECIMAL_COLUMN2", org.apache.spark.sql.types.DecimalType(36,10)), StructField("Double_COLUMN1", DoubleType), StructField("Double_COLUMN2", DoubleType), StructField("INTEGER_COLUMN1", IntegerType))) > val streamDf = carbon.readStream > .schema(uniqdataSch) > .option("sep", ",") > .csv("file:///home/geetika/Downloads/uniqdata") > val dfToWrite = streamDf.map{x => x.get(0) + "," + x.get(1) + "," + x.get(2)+ "," + x.get(3)+ "," + x.get(4)+ "," + x.get(5)+ "," + x.get(6)+ "," + x.get(7)+ "," + x.get(8)+ "," + x.get(9)+ "," + x.get(10)+ "," + x.get(11)} > val qry = dfToWrite.writeStream.format("carbondata").trigger(ProcessingTime("5 seconds")) > .option("checkpointLocation","/stream/uniq8") > .option("dbName", "default") > .option("tableName", "uniqdata_stream_8") > .start() > qry.awaitTermination() > Now close this shell and check the record count on the table using : > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.CarbonSession._ > val carbon = SparkSession.builder().config(sc.getConf) .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp") > carbon.sql("select count(*) from uniqdata_stream_8").show > OUTPUT: > scala> carbon.sql("select count(*) from uniqdata_stream_8").show > 18/01/08 15:51:53 ERROR CarbonProperties: Executor task launch worker-0 Configured value for property carbon.number.of.cores.while.loading is wrong. Falling back to the default value 2 > +--------+ > |count(1)| > +--------+ > | 2013| > +--------+ > Again try the above scenario and check the count. It remains same after the second streaming load. -- This message was sent by Atlassian JIRA (v6.4.14#64029)