Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 562EC200D56 for ; Mon, 27 Nov 2017 09:56:10 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 54B3A160C02; Mon, 27 Nov 2017 08:56:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 55A81160C13 for ; Mon, 27 Nov 2017 09:56:09 +0100 (CET) Received: (qmail 28531 invoked by uid 500); 27 Nov 2017 08:56:08 -0000 Mailing-List: contact issues-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list issues@carbondata.apache.org Received: (qmail 28357 invoked by uid 99); 27 Nov 2017 08:56:08 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Nov 2017 08:56:08 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 9BB7EC496B for ; Mon, 27 Nov 2017 08:56:07 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.811 X-Spam-Level: X-Spam-Status: No, score=-99.811 tagged_above=-999 required=6.31 tests=[KB_WAM_FROM_NAME_SINGLEWORD=0.2, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id CbdYn1jzazKg for ; Mon, 27 Nov 2017 08:56:05 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id CF0775F2C2 for ; Mon, 27 Nov 2017 08:56:03 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 52CD3E0F13 for ; Mon, 27 Nov 2017 08:56:02 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 1339D241AB for ; Mon, 27 Nov 2017 08:56:01 +0000 (UTC) Date: Mon, 27 Nov 2017 08:56:01 +0000 (UTC) From: "Chetan Bhat (JIRA)" To: issues@carbondata.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Created] (CARBONDATA-1814) (Carbon1.3.0 - Streaming) Nullpointereception in spark shell when the streaming started with table streaming altered from default(false) to true MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Mon, 27 Nov 2017 08:56:10 -0000 Chetan Bhat created CARBONDATA-1814: --------------------------------------- Summary: (Carbon1.3.0 - Streaming) Nullpointereception in spar= k shell when the streaming started with table streaming altered from defaul= t(false) to true Key: CARBONDATA-1814 URL: https://issues.apache.org/jira/browse/CARBONDATA-1814 Project: CarbonData Issue Type: Bug Components: other Affects Versions: 1.3.0 Environment: 3 node ant cluster Reporter: Chetan Bhat Steps : Spark submit thrift server is started. User starts spark shell using the command - bin/spark-shell --master yarn-c= lient --executor-memory 10G --executor-cores 5 --driver-memory 5G --num-exe= cutors 3 --jars /srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carb= ondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar In spark shell User tries to start streaming with table streaming property = altered from default(false) to true. scala> import java.io.{File, PrintWriter} import java.io.{File, PrintWriter} scala> import java.net.ServerSocket import java.net.ServerSocket scala> scala> import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.apache.spark.sql.{CarbonEnv, SparkSession} scala> import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.hive.CarbonRelation scala> import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuer= y} import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} scala> scala> import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.constants.CarbonCommonConstants scala> import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.CarbonProperties scala> import org.apache.carbondata.core.util.path.{CarbonStorePath, Carbon= TablePath} import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePa= th} scala> scala> CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CAR= BON_TIMESTAMP_FORMAT, "yyyy/MM/dd") res0: org.apache.carbondata.core.util.CarbonProperties =3D org.apache.carbo= ndata.core.util.CarbonProperties@69ee0861 scala> scala> import org.apache.spark.sql.CarbonSession._ import org.apache.spark.sql.CarbonSession._ scala> scala> val carbonSession =3D SparkSession. | builder(). | appName("StreamExample"). | getOrCreateCarbonSession("hdfs://hacluster/user/hive/warehouse/car= bon.store") carbonSession: org.apache.spark.sql.SparkSession =3D org.apache.spark.sql.C= arbonSession@6ce365b7 scala> | carbonSession.sparkContext.setLogLevel("INFO") scala> scala> def sql(sql: String) =3D carbonSession.sql(sql) sql: (sql: String)org.apache.spark.sql.DataFrame scala> scala> def writeSocket(serverSocket: ServerSocket): Thread =3D { | val thread =3D new Thread() { | override def run(): Unit =3D { | // wait for client to connection request and accept | val clientSocket =3D serverSocket.accept() | val socketWriter =3D new PrintWriter(clientSocket.getOutputStr= eam()) | var index =3D 0 | for (_ <- 1 to 1000) { | // write 5 records per iteration | for (_ <- 0 to 100) { | index =3D index + 1 | socketWriter.println(index.toString + ",name_" + index | + ",city_" + index + "," + (index * 1= 0000.00).toString + | ",school_" + index + ":school_" + ind= ex + index + "$" + index) | } | socketWriter.flush() | Thread.sleep(2000) | } | socketWriter.close() | System.out.println("Socket closed") | } | } | thread.start() | thread | } writeSocket: (serverSocket: java.net.ServerSocket)Thread scala> | def startStreaming(spark: SparkSession, tablePath: CarbonTablePath, = tableName: String, port: Int): Thread =3D { | val thread =3D new Thread() { | override def run(): Unit =3D { | var qry: StreamingQuery =3D null | try { | val readSocketDF =3D spark.readStream | .format("socket") | .option("host", "10.18.98.34") | .option("port", port) | .load() | | qry =3D readSocketDF.writeStream | .format("carbondata") | .trigger(ProcessingTime("5 seconds")) | .option("checkpointLocation", tablePath.getStreamingChec= kpointDir) | .option("tablePath", tablePath.getPath).option("tableNam= e", tableName) | .start() | | qry.awaitTermination() | } catch { | case ex: Throwable =3D> | ex.printStackTrace() | println("Done reading and writing streaming data") | } finally { | qry.stop() | } | } | } | thread.start() | thread | } startStreaming: (spark: org.apache.spark.sql.SparkSession, tablePath: org.a= pache.carbondata.core.util.path.CarbonTablePath, tableName: String, port: I= nt)Thread scala> scala> val streamTableName =3D "all_datatypes_2048" streamTableName: String =3D all_datatypes_2048 scala> scala> scala> sql(s"create table all_datatypes_2048 (imei string,deviceInformation= Id int,MAC string,deviceColor string,device_backColor string,modelId string= ,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked = string,series string,productionDate timestamp,bomCode string,internalModels= string, deliveryTime string, channelsId string, channelsName string , deli= veryAreaId string, deliveryCountry string, deliveryProvince string, deliver= yCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber= string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string,= ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStr= eet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVe= rsion string, Active_operaSysVersion string, Active_BacVerNumber string, Ac= tive_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVe= r string,Active_webTypeDataVerNumber string, Active_operatorsVersion string= , Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH = int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, L= atest_country string, Latest_province string, Latest_city string, Latest_di= strict string, Latest_street string, Latest_releaseId string, Latest_EMUIVe= rsion string, Latest_operaSysVersion string, Latest_BacVerNumber string, La= test_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVe= r string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion strin= g, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gam= ePointDescription string,gamePointId double,contractNumber BigInt) STORED B= Y 'org.apache.carbondata.format' TBLPROPERTIES('table_blocksize'=3D'2048')"= ) res4: org.apache.spark.sql.DataFrame =3D [] scala> scala> sql(s"LOAD DATA INPATH 'hdfs://hacluster/chetan/100_olap_C20.csv' IN= TO table all_datatypes_2048 options ('DELIMITER'=3D',', 'BAD_RECORDS_ACTION= '=3D'FORCE','FILEHEADER'=3D'imei,deviceInformationId,MAC,deviceColor,device= _backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,pro= ductionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,del= iveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,= deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,A= ctiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOp= eratorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_= BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer= ,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartiti= onedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,= Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,La= test_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumbe= r,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_web= TypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersion= s,Latest_operatorId,gamePointId,gamePointDescription')") res5: org.apache.spark.sql.DataFrame =3D [] scala> scala> sql(s"ALTER TABLE all_datatypes_2048 SET TBLPROPERTIES('streaming'= =3D'true')") res6: org.apache.spark.sql.DataFrame =3D [] scala> scala> scala> scala> val carbonTable =3D CarbonEnv.getInstance(carbonSession).carbonMetas= tore. | lookupRelation(Some("default"), streamTableName)(carbonSession).as= InstanceOf[CarbonRelation].carbonTable carbonTable: org.apache.carbondata.core.metadata.schema.table.CarbonTable = =3D org.apache.carbondata.core.metadata.schema.table.CarbonTable@77648a90 scala> scala> val tablePath =3D CarbonStorePath.getCarbonTablePath(carbonTable.get= AbsoluteTableIdentifier) tablePath: org.apache.carbondata.core.util.path.CarbonTablePath =3D hdfs://= hacluster/user/hive/warehouse/carbon.store/default/all_datatypes_2048 scala> scala> val port =3D 8010 port: Int =3D 8010 scala> val serverSocket =3D new ServerSocket(port) serverSocket: java.net.ServerSocket =3D ServerSocket[addr=3D0.0.0.0/0.0.0.0= ,localport=3D8010] scala> val socketThread =3D writeSocket(serverSocket) socketThread: Thread =3D Thread[Thread-81,5,main] scala> val streamingThread =3D startStreaming(carbonSession, tablePath, str= eamTableName, port) Issue : Nullpointereception in spark shell when the streaming started with = table streaming altered from default(false) to true. Streaming fails. scala> org.apache.carbondata.streaming.CarbonStreamException: Table default= .all_datatypes_2048 is not a streaming table at org.apache.spark.sql.CarbonSource.createSink(CarbonSource.scala:= 242) at org.apache.spark.sql.execution.datasources.DataSource.createSink= (DataSource.scala:274) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStream= Writer.scala:266) at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.ru= n(:51) Done reading and writing streaming data Exception in thread "Thread-82" java.lang.NullPointerException at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.ru= n(:59) Expected : Streaming should be continued successfully without any failure o= r exception after table streaming property altered from default(false) to t= rue. -- This message was sent by Atlassian JIRA (v6.4.14#64029)