Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 265EA200D44 for ; Mon, 20 Nov 2017 15:44:11 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 24A90160C0D; Mon, 20 Nov 2017 14:44:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9B5FD160BEC for ; Mon, 20 Nov 2017 15:44:09 +0100 (CET) Received: (qmail 10673 invoked by uid 500); 20 Nov 2017 14:44:08 -0000 Mailing-List: contact issues-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list issues@carbondata.apache.org Received: (qmail 10664 invoked by uid 99); 20 Nov 2017 14:44:08 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Nov 2017 14:44:08 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 10038180739 for ; Mon, 20 Nov 2017 14:44:08 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id KhgNxBWSTSJR for ; Mon, 20 Nov 2017 14:44:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 572DC5F6BE for ; Mon, 20 Nov 2017 14:44:02 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 5C0EAE257F for ; Mon, 20 Nov 2017 14:44:01 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 8F1D1240F2 for ; Mon, 20 Nov 2017 14:44:00 +0000 (UTC) Date: Mon, 20 Nov 2017 14:44:00 +0000 (UTC) From: "Chetan Bhat (JIRA)" To: issues@carbondata.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (CARBONDATA-1783) (Carbon1.3.0 - Streaming) Error "Failed to filter row in vector reader" when filter query executed on streaming data MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Mon, 20 Nov 2017 14:44:11 -0000 [ https://issues.apache.org/jira/browse/CARBONDATA-1783?page=3Dcom.atl= assian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chetan Bhat updated CARBONDATA-1783: ------------------------------------ Description:=20 Steps :- Spark submit thrift server is started using the command - bin/spark-submit = --master yarn-client --executor-memory 10G --executor-cores 5 --driver-memo= ry 5G --num-executors 3 --class org.apache.carbondata.spark.thriftserver.Ca= rbonThriftServer /srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/car= bondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar "hdfs://hacluster/user/hi= ve/warehouse/carbon.store" Spark shell is launched using the command - bin/spark-shell --master yarn-c= lient --executor-memory 10G --executor-cores 5 --driver-memory 5G --num-exe= cutors 3 --jars /srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carb= ondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar From Spark shell user creates table and loads data in the table as shown be= low. import java.io.{File, PrintWriter} import java.net.ServerSocket import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePa= th} CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIM= ESTAMP_FORMAT, "yyyy/MM/dd") import org.apache.spark.sql.CarbonSession._ val carbonSession =3D SparkSession. builder(). appName("StreamExample"). getOrCreateCarbonSession("hdfs://hacluster/user/hive/warehouse/carbon.sto= re") =20 carbonSession.sparkContext.setLogLevel("INFO") def sql(sql: String) =3D carbonSession.sql(sql) def writeSocket(serverSocket: ServerSocket): Thread =3D { val thread =3D new Thread() { override def run(): Unit =3D { // wait for client to connection request and accept val clientSocket =3D serverSocket.accept() val socketWriter =3D new PrintWriter(clientSocket.getOutputStream()) var index =3D 0 for (_ <- 1 to 1000) { // write 5 records per iteration for (_ <- 0 to 100) { index =3D index + 1 socketWriter.println(index.toString + ",name_" + index + ",city_" + index + "," + (index * 10000.00= ).toString + ",school_" + index + ":school_" + index + in= dex + "$" + index) } socketWriter.flush() Thread.sleep(2000) } socketWriter.close() System.out.println("Socket closed") } } thread.start() thread } =20 def startStreaming(spark: SparkSession, tablePath: CarbonTablePath, tableNa= me: String, port: Int): Thread =3D { val thread =3D new Thread() { override def run(): Unit =3D { var qry: StreamingQuery =3D null try { val readSocketDF =3D spark.readStream .format("socket") .option("host", "10.18.98.34") .option("port", port) .load() qry =3D readSocketDF.writeStream .format("carbondata") .trigger(ProcessingTime("5 seconds")) .option("checkpointLocation", tablePath.getStreamingCheckpointD= ir) .option("tablePath", tablePath.getPath).option("tableName", tab= leName) .start() qry.awaitTermination() } catch { case ex: Throwable =3D> ex.printStackTrace() println("Done reading and writing streaming data") } finally { qry.stop() } } } thread.start() thread } val streamTableName =3D "all_datatypes_2048" sql(s"create table all_datatypes_2048 (imei string,deviceInformationId int,= MAC string,deviceColor string,device_backColor string,modelId string,market= Name string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,= series string,productionDate timestamp,bomCode string,internalModels string= , deliveryTime string, channelsId string, channelsName string , deliveryAre= aId string, deliveryCountry string, deliveryProvince string, deliveryCity s= tring,deliveryDistrict string, deliveryStreet string, oxSingleNumber string= , ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, Active= Province string, Activecity string, ActiveDistrict string, ActiveStreet str= ing, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion s= tring, Active_operaSysVersion string, Active_BacVerNumber string, Active_Ba= cFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer strin= g,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Activ= e_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, La= test_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_c= ountry string, Latest_province string, Latest_city string, Latest_district = string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion s= tring, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_Ba= cFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer strin= g, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Late= st_phonePADPartitionedVersions string, Latest_operatorId string, gamePointD= escription string,gamePointId double,contractNumber BigInt) STORED BY 'org.= apache.carbondata.format' TBLPROPERTIES('streaming'=3D'true','table_blocksi= ze'=3D'2048')") sql(s"LOAD DATA INPATH 'hdfs://hacluster/chetan/100_olap_C20.csv' INTO tabl= e all_datatypes_2048 options ('DELIMITER'=3D',', 'BAD_RECORDS_ACTION'=3D'FO= RCE','FILEHEADER'=3D'imei,deviceInformationId,MAC,deviceColor,device_backCo= lor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,production= Date,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAr= eaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliver= yStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCo= untry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorI= d,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerN= umber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active= _webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVer= sions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_= country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_re= leaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Lates= t_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDat= aVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Lates= t_operatorId,gamePointId,gamePointDescription')") val carbonTable =3D CarbonEnv.getInstance(carbonSession).carbonMetastore. lookupRelation(Some("default"), streamTableName)(carbonSession).asInstanc= eOf[CarbonRelation].carbonTable val tablePath =3D CarbonStorePath.getCarbonTablePath(carbonTable.getAbsolut= eTableIdentifier) val port =3D 8007 val serverSocket =3D new ServerSocket(port) val socketThread =3D writeSocket(serverSocket) val streamingThread =3D startStreaming(carbonSession, tablePath, streamTabl= eName, port) While the streaming load is in progress from Beeline user executes the belo= w select filter query=20 select imei,gamePointId, channelsId,series from all_datatypes_2048 where = channelsId >=3D10 OR channelsId <=3D1 and series=3D'7Series'; *Issue : The select filter query fails with exception as shown below.* 0: jdbc:hive2://10.18.98.34:23040> select imei,gamePointId, channelsId,seri= es from all_datatypes_2048 where channelsId >=3D10 OR channelsId <=3D1 and= series=3D'7Series'; Error: org.apache.spark.SparkException: Job aborted due to stage failure: T= ask 6 in stage 773.0 failed 4 times, most recent failure: Lost task 6.3 in = stage 773.0 (TID 33727, BLR1000014269, executor 14): java.io.IOException: F= ailed to filter row in vector reader at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.= scanBlockletAndFillVector(CarbonStreamRecordReader.java:423) at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.= nextColumnarBatch(CarbonStreamRecordReader.java:317) at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.= nextKeyValue(CarbonStreamRecordReader.java:298) at org.apache.carbondata.spark.rdd.CarbonScanRDD$$anon$1.hasNext(Ca= rbonScanRDD.scala:298) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Generat= edIterator.scan_nextBatch$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Generat= edIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(Buffe= redRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$= $anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkP= lan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkP= lan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonf= un$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonf= un$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.s= cala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:8= 7) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala= :282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExec= utor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExe= cutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.carbondata.core.scan.expression.exception.FilterUnsup= portedException: [B cannot be cast to org.apache.spark.unsafe.types.UTF8Str= ing at org.apache.spark.sql.SparkUnknownExpression.evaluate(SparkUnknow= nExpression.scala:50) at org.apache.carbondata.core.scan.expression.conditional.GreaterTh= anEqualToExpression.evaluate(GreaterThanEqualToExpression.java:38) at org.apache.carbondata.core.scan.filter.executer.RowLevelFilterEx= ecuterImpl.applyFilter(RowLevelFilterExecuterImpl.java:272) at org.apache.carbondata.core.scan.filter.executer.OrFilterExecuter= Impl.applyFilter(OrFilterExecuterImpl.java:49) at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.= scanBlockletAndFillVector(CarbonStreamRecordReader.java:418) ... 20 more Driver stacktrace: (state=3D,code=3D0) Expected : The select filter query should be success without error/exceptio= n. The issue also occurs with the below queries select imei,gamePointId, channelsId,series from all_datatypes_2048 where c= hannelsId >=3D10 OR channelsId <=3D1 or series=3D'7Series'; select imei,gamePointId, channelsId,series from all_datatypes_2048 where c= hannelsId >=3D10 OR (channelsId <=3D1 and series=3D'1Series'); select sum(gamePointId) a from all_datatypes_2048 where channelsId >=3D10 O= R (channelsId <=3D1 and series=3D'1Series'); select * from (select imei,if(imei=3D'1AA100060',NULL,imei) a from all_data= types_2048) aa where a IS NULL; select imei from all_datatypes_2048 where (contractNumber =3D=3D 5281803)= and (gamePointId=3D=3D2738.562); select deliveryCity from all_datatypes_2048 where (deliveryCity =3D=3D 'y= ichang') and ( deliveryStreet=3D=3D'yichang'); select channelsId from all_datatypes_2048 where (channelsId =3D=3D '4') = and (gamePointId=3D=3D2738.562); select imei from all_datatypes_2048 where (contractNumber =3D=3D 5281803)= OR (gamePointId=3D=3D2738.562) order by contractNumber ; select channelsId from all_datatypes_2048 where (channelsId =3D=3D '4') = OR (gamePointId=3D=3D2738.562) order by channelsId ; select deliveryCity from all_datatypes_2048 where (deliveryCity =3D=3D '= yichang') OR ( deliveryStreet=3D=3D'yichang') order by deliveryCity; was: Steps :- Spark submit thrift server is started using the command - bin/spark-submit = --master yarn-client --executor-memory 10G --executor-cores 5 --driver-memo= ry 5G --num-executors 3 --class org.apache.carbondata.spark.thriftserver.Ca= rbonThriftServer /srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/car= bondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar "hdfs://hacluster/user/hi= ve/warehouse/carbon.store" Spark shell is launched using the command - bin/spark-shell --master yarn-c= lient --executor-memory 10G --executor-cores 5 --driver-memory 5G --num-exe= cutors 3 --jars /srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carb= ondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar From Spark shell user creates table and loads data in the table as shown be= low. import java.io.{File, PrintWriter} import java.net.ServerSocket import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePa= th} CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIM= ESTAMP_FORMAT, "yyyy/MM/dd") import org.apache.spark.sql.CarbonSession._ val carbonSession =3D SparkSession. builder(). appName("StreamExample"). getOrCreateCarbonSession("hdfs://hacluster/user/hive/warehouse/carbon.sto= re") =20 carbonSession.sparkContext.setLogLevel("INFO") def sql(sql: String) =3D carbonSession.sql(sql) def writeSocket(serverSocket: ServerSocket): Thread =3D { val thread =3D new Thread() { override def run(): Unit =3D { // wait for client to connection request and accept val clientSocket =3D serverSocket.accept() val socketWriter =3D new PrintWriter(clientSocket.getOutputStream()) var index =3D 0 for (_ <- 1 to 1000) { // write 5 records per iteration for (_ <- 0 to 100) { index =3D index + 1 socketWriter.println(index.toString + ",name_" + index + ",city_" + index + "," + (index * 10000.00= ).toString + ",school_" + index + ":school_" + index + in= dex + "$" + index) } socketWriter.flush() Thread.sleep(2000) } socketWriter.close() System.out.println("Socket closed") } } thread.start() thread } =20 def startStreaming(spark: SparkSession, tablePath: CarbonTablePath, tableNa= me: String, port: Int): Thread =3D { val thread =3D new Thread() { override def run(): Unit =3D { var qry: StreamingQuery =3D null try { val readSocketDF =3D spark.readStream .format("socket") .option("host", "10.18.98.34") .option("port", port) .load() qry =3D readSocketDF.writeStream .format("carbondata") .trigger(ProcessingTime("5 seconds")) .option("checkpointLocation", tablePath.getStreamingCheckpointD= ir) .option("tablePath", tablePath.getPath).option("tableName", tab= leName) .start() qry.awaitTermination() } catch { case ex: Throwable =3D> ex.printStackTrace() println("Done reading and writing streaming data") } finally { qry.stop() } } } thread.start() thread } val streamTableName =3D "all_datatypes_2048" sql(s"create table all_datatypes_2048 (imei string,deviceInformationId int,= MAC string,deviceColor string,device_backColor string,modelId string,market= Name string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,= series string,productionDate timestamp,bomCode string,internalModels string= , deliveryTime string, channelsId string, channelsName string , deliveryAre= aId string, deliveryCountry string, deliveryProvince string, deliveryCity s= tring,deliveryDistrict string, deliveryStreet string, oxSingleNumber string= , ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, Active= Province string, Activecity string, ActiveDistrict string, ActiveStreet str= ing, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion s= tring, Active_operaSysVersion string, Active_BacVerNumber string, Active_Ba= cFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer strin= g,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Activ= e_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, La= test_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_c= ountry string, Latest_province string, Latest_city string, Latest_district = string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion s= tring, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_Ba= cFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer strin= g, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Late= st_phonePADPartitionedVersions string, Latest_operatorId string, gamePointD= escription string,gamePointId double,contractNumber BigInt) STORED BY 'org.= apache.carbondata.format' TBLPROPERTIES('streaming'=3D'true','table_blocksi= ze'=3D'2048')") sql(s"LOAD DATA INPATH 'hdfs://hacluster/chetan/100_olap_C20.csv' INTO tabl= e all_datatypes_2048 options ('DELIMITER'=3D',', 'BAD_RECORDS_ACTION'=3D'FO= RCE','FILEHEADER'=3D'imei,deviceInformationId,MAC,deviceColor,device_backCo= lor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,production= Date,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAr= eaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliver= yStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCo= untry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorI= d,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerN= umber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active= _webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVer= sions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_= country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_re= leaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Lates= t_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDat= aVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Lates= t_operatorId,gamePointId,gamePointDescription')") val carbonTable =3D CarbonEnv.getInstance(carbonSession).carbonMetastore. lookupRelation(Some("default"), streamTableName)(carbonSession).asInstanc= eOf[CarbonRelation].carbonTable val tablePath =3D CarbonStorePath.getCarbonTablePath(carbonTable.getAbsolut= eTableIdentifier) val port =3D 8007 val serverSocket =3D new ServerSocket(port) val socketThread =3D writeSocket(serverSocket) val streamingThread =3D startStreaming(carbonSession, tablePath, streamTabl= eName, port) While the streaming load is in progress from Beeline user executes the belo= w select filter query=20 select imei,gamePointId, channelsId,series from all_datatypes_2048 where = channelsId >=3D10 OR channelsId <=3D1 and series=3D'7Series'; *Issue : The select filter query fails with exception as shown below.* 0: jdbc:hive2://10.18.98.34:23040> select imei,gamePointId, channelsId,seri= es from all_datatypes_2048 where channelsId >=3D10 OR channelsId <=3D1 and= series=3D'7Series'; Error: org.apache.spark.SparkException: Job aborted due to stage failure: T= ask 6 in stage 773.0 failed 4 times, most recent failure: Lost task 6.3 in = stage 773.0 (TID 33727, BLR1000014269, executor 14): java.io.IOException: F= ailed to filter row in vector reader at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.= scanBlockletAndFillVector(CarbonStreamRecordReader.java:423) at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.= nextColumnarBatch(CarbonStreamRecordReader.java:317) at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.= nextKeyValue(CarbonStreamRecordReader.java:298) at org.apache.carbondata.spark.rdd.CarbonScanRDD$$anon$1.hasNext(Ca= rbonScanRDD.scala:298) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Generat= edIterator.scan_nextBatch$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Generat= edIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(Buffe= redRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$= $anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkP= lan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkP= lan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonf= un$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonf= un$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.s= cala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:8= 7) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala= :282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExec= utor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExe= cutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.carbondata.core.scan.expression.exception.FilterUnsup= portedException: [B cannot be cast to org.apache.spark.unsafe.types.UTF8Str= ing at org.apache.spark.sql.SparkUnknownExpression.evaluate(SparkUnknow= nExpression.scala:50) at org.apache.carbondata.core.scan.expression.conditional.GreaterTh= anEqualToExpression.evaluate(GreaterThanEqualToExpression.java:38) at org.apache.carbondata.core.scan.filter.executer.RowLevelFilterEx= ecuterImpl.applyFilter(RowLevelFilterExecuterImpl.java:272) at org.apache.carbondata.core.scan.filter.executer.OrFilterExecuter= Impl.applyFilter(OrFilterExecuterImpl.java:49) at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.= scanBlockletAndFillVector(CarbonStreamRecordReader.java:418) ... 20 more Driver stacktrace: (state=3D,code=3D0) Expected : The select filter query should be success without error/exceptio= n. > (Carbon1.3.0 - Streaming) Error "Failed to filter row in vector reader" w= hen filter query executed on streaming data > -------------------------------------------------------------------------= ------------------------------------------- > > Key: CARBONDATA-1783 > URL: https://issues.apache.org/jira/browse/CARBONDATA-178= 3 > Project: CarbonData > Issue Type: Bug > Components: data-query > Affects Versions: 1.3.0 > Environment: 3 node ant cluster > Reporter: Chetan Bhat > Labels: DFX > > Steps :- > Spark submit thrift server is started using the command - bin/spark-submi= t --master yarn-client --executor-memory 10G --executor-cores 5 --driver-me= mory 5G --num-executors 3 --class org.apache.carbondata.spark.thriftserver.= CarbonThriftServer /srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/c= arbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar "hdfs://hacluster/user/= hive/warehouse/carbon.store" > Spark shell is launched using the command - bin/spark-shell --master yarn= -client --executor-memory 10G --executor-cores 5 --driver-memory 5G --num-e= xecutors 3 --jars /srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/ca= rbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar > From Spark shell user creates table and loads data in the table as shown = below. > import java.io.{File, PrintWriter} > import java.net.ServerSocket > import org.apache.spark.sql.{CarbonEnv, SparkSession} > import org.apache.spark.sql.hive.CarbonRelation > import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} > import org.apache.carbondata.core.constants.CarbonCommonConstants > import org.apache.carbondata.core.util.CarbonProperties > import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTable= Path} > CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_T= IMESTAMP_FORMAT, "yyyy/MM/dd") > import org.apache.spark.sql.CarbonSession._ > val carbonSession =3D SparkSession. > builder(). > appName("StreamExample"). > getOrCreateCarbonSession("hdfs://hacluster/user/hive/warehouse/carbon.s= tore") > =20 > carbonSession.sparkContext.setLogLevel("INFO") > def sql(sql: String) =3D carbonSession.sql(sql) > def writeSocket(serverSocket: ServerSocket): Thread =3D { > val thread =3D new Thread() { > override def run(): Unit =3D { > // wait for client to connection request and accept > val clientSocket =3D serverSocket.accept() > val socketWriter =3D new PrintWriter(clientSocket.getOutputStream()= ) > var index =3D 0 > for (_ <- 1 to 1000) { > // write 5 records per iteration > for (_ <- 0 to 100) { > index =3D index + 1 > socketWriter.println(index.toString + ",name_" + index > + ",city_" + index + "," + (index * 10000.= 00).toString + > ",school_" + index + ":school_" + index + = index + "$" + index) > } > socketWriter.flush() > Thread.sleep(2000) > } > socketWriter.close() > System.out.println("Socket closed") > } > } > thread.start() > thread > } > =20 > def startStreaming(spark: SparkSession, tablePath: CarbonTablePath, table= Name: String, port: Int): Thread =3D { > val thread =3D new Thread() { > override def run(): Unit =3D { > var qry: StreamingQuery =3D null > try { > val readSocketDF =3D spark.readStream > .format("socket") > .option("host", "10.18.98.34") > .option("port", port) > .load() > qry =3D readSocketDF.writeStream > .format("carbondata") > .trigger(ProcessingTime("5 seconds")) > .option("checkpointLocation", tablePath.getStreamingCheckpoin= tDir) > .option("tablePath", tablePath.getPath).option("tableName", t= ableName) > .start() > qry.awaitTermination() > } catch { > case ex: Throwable =3D> > ex.printStackTrace() > println("Done reading and writing streaming data") > } finally { > qry.stop() > } > } > } > thread.start() > thread > } > val streamTableName =3D "all_datatypes_2048" > sql(s"create table all_datatypes_2048 (imei string,deviceInformationId in= t,MAC string,deviceColor string,device_backColor string,modelId string,mark= etName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked strin= g,series string,productionDate timestamp,bomCode string,internalModels stri= ng, deliveryTime string, channelsId string, channelsName string , deliveryA= reaId string, deliveryCountry string, deliveryProvince string, deliveryCity= string,deliveryDistrict string, deliveryStreet string, oxSingleNumber stri= ng, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, Acti= veProvince string, Activecity string, ActiveDistrict string, ActiveStreet s= tring, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion= string, Active_operaSysVersion string, Active_BacVerNumber string, Active_= BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer str= ing,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Act= ive_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, = Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest= _country string, Latest_province string, Latest_city string, Latest_distric= t string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion= string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_= BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer str= ing, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, La= test_phonePADPartitionedVersions string, Latest_operatorId string, gamePoin= tDescription string,gamePointId double,contractNumber BigInt) STORED BY 'or= g.apache.carbondata.format' TBLPROPERTIES('streaming'=3D'true','table_block= size'=3D'2048')") > sql(s"LOAD DATA INPATH 'hdfs://hacluster/chetan/100_olap_C20.csv' INTO ta= ble all_datatypes_2048 options ('DELIMITER'=3D',', 'BAD_RECORDS_ACTION'=3D'= FORCE','FILEHEADER'=3D'imei,deviceInformationId,MAC,deviceColor,device_back= Color,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,producti= onDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,delivery= AreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliv= eryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,Active= Country,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperato= rId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVe= rNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Acti= ve_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedV= ersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Lates= t_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_= releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Lat= est_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeD= ataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Lat= est_operatorId,gamePointId,gamePointDescription')") > val carbonTable =3D CarbonEnv.getInstance(carbonSession).carbonMetastore. > lookupRelation(Some("default"), streamTableName)(carbonSession).asInsta= nceOf[CarbonRelation].carbonTable > val tablePath =3D CarbonStorePath.getCarbonTablePath(carbonTable.getAbsol= uteTableIdentifier) > val port =3D 8007 > val serverSocket =3D new ServerSocket(port) > val socketThread =3D writeSocket(serverSocket) > val streamingThread =3D startStreaming(carbonSession, tablePath, streamTa= bleName, port) > While the streaming load is in progress from Beeline user executes the be= low select filter query=20 > select imei,gamePointId, channelsId,series from all_datatypes_2048 wher= e channelsId >=3D10 OR channelsId <=3D1 and series=3D'7Series'; > *Issue : The select filter query fails with exception as shown below.* > 0: jdbc:hive2://10.18.98.34:23040> select imei,gamePointId, channelsId,se= ries from all_datatypes_2048 where channelsId >=3D10 OR channelsId <=3D1 a= nd series=3D'7Series'; > Error: org.apache.spark.SparkException: Job aborted due to stage failure:= Task 6 in stage 773.0 failed 4 times, most recent failure: Lost task 6.3 i= n stage 773.0 (TID 33727, BLR1000014269, executor 14): java.io.IOException:= Failed to filter row in vector reader > at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReade= r.scanBlockletAndFillVector(CarbonStreamRecordReader.java:423) > at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReade= r.nextColumnarBatch(CarbonStreamRecordReader.java:317) > at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReade= r.nextKeyValue(CarbonStreamRecordReader.java:298) > at org.apache.carbondata.spark.rdd.CarbonScanRDD$$anon$1.hasNext(= CarbonScanRDD.scala:298) > at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gener= atedIterator.scan_nextBatch$(Unknown Source) > at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gener= atedIterator.processNext(Unknown Source) > at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(Buf= feredRowIterator.java:43) > at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$= 8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(Spar= kPlan.scala:231) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(Spar= kPlan.scala:225) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$ano= nfun$apply$25.apply(RDD.scala:826) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$ano= nfun$apply$25.apply(RDD.scala:826) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD= .scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323= ) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala= :87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.sca= la:282) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolEx= ecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolE= xecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.carbondata.core.scan.expression.exception.FilterUns= upportedException: [B cannot be cast to org.apache.spark.unsafe.types.UTF8S= tring > at org.apache.spark.sql.SparkUnknownExpression.evaluate(SparkUnkn= ownExpression.scala:50) > at org.apache.carbondata.core.scan.expression.conditional.Greater= ThanEqualToExpression.evaluate(GreaterThanEqualToExpression.java:38) > at org.apache.carbondata.core.scan.filter.executer.RowLevelFilter= ExecuterImpl.applyFilter(RowLevelFilterExecuterImpl.java:272) > at org.apache.carbondata.core.scan.filter.executer.OrFilterExecut= erImpl.applyFilter(OrFilterExecuterImpl.java:49) > at org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReade= r.scanBlockletAndFillVector(CarbonStreamRecordReader.java:418) > ... 20 more > Driver stacktrace: (state=3D,code=3D0) > Expected : The select filter query should be success without error/except= ion. > The issue also occurs with the below queries > select imei,gamePointId, channelsId,series from all_datatypes_2048 where= channelsId >=3D10 OR channelsId <=3D1 or series=3D'7Series'; > select imei,gamePointId, channelsId,series from all_datatypes_2048 where= channelsId >=3D10 OR (channelsId <=3D1 and series=3D'1Series'); > select sum(gamePointId) a from all_datatypes_2048 where channelsId >=3D10= OR (channelsId <=3D1 and series=3D'1Series'); > select * from (select imei,if(imei=3D'1AA100060',NULL,imei) a from all_da= tatypes_2048) aa where a IS NULL; > select imei from all_datatypes_2048 where (contractNumber =3D=3D 528180= 3) and (gamePointId=3D=3D2738.562); > select deliveryCity from all_datatypes_2048 where (deliveryCity =3D=3D = 'yichang') and ( deliveryStreet=3D=3D'yichang'); > select channelsId from all_datatypes_2048 where (channelsId =3D=3D '4'= ) and (gamePointId=3D=3D2738.562); > select imei from all_datatypes_2048 where (contractNumber =3D=3D 528180= 3) OR (gamePointId=3D=3D2738.562) order by contractNumber ; > select channelsId from all_datatypes_2048 where (channelsId =3D=3D '4'= ) OR (gamePointId=3D=3D2738.562) order by channelsId ; > select deliveryCity from all_datatypes_2048 where (deliveryCity =3D=3D= 'yichang') OR ( deliveryStreet=3D=3D'yichang') order by deliveryCity; -- This message was sent by Atlassian JIRA (v6.4.14#64029)