Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 68A8B200CF4 for ; Sun, 3 Sep 2017 18:54:15 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 66EF21641CF; Sun, 3 Sep 2017 16:54:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B597D1641CB for ; Sun, 3 Sep 2017 18:54:13 +0200 (CEST) Received: (qmail 70516 invoked by uid 500); 3 Sep 2017 16:54:12 -0000 Mailing-List: contact issues-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list issues@carbondata.apache.org Received: (qmail 70506 invoked by uid 99); 3 Sep 2017 16:54:12 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Sep 2017 16:54:12 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 72798182DD3 for ; Sun, 3 Sep 2017 16:54:12 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id jfsn4XJt4MtR for ; Sun, 3 Sep 2017 16:54:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 646B35FB2E for ; Sun, 3 Sep 2017 16:54:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 91C1DE09A0 for ; Sun, 3 Sep 2017 16:54:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 3EDF72414B for ; Sun, 3 Sep 2017 16:54:00 +0000 (UTC) Date: Sun, 3 Sep 2017 16:54:00 +0000 (UTC) From: "Zhichao Zhang (JIRA)" To: issues@carbondata.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (CARBONDATA-1445) if 'carbon.update.persist.enable'='false', it will fail to update data MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Sun, 03 Sep 2017 16:54:15 -0000 [ https://issues.apache.org/jira/browse/CARBONDATA-1445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhichao Zhang updated CARBONDATA-1445: --------------------------------------- Description: When updating data, if set 'carbon.update.persist.enable'='false', it will fail. I debug code and find that in the method LoadTable.processData the 'dataFrameWithTupleId' will call udf 'getTupleId()' which is defined in CarbonEnv.init(): 'sparkSession.udf.register("getTupleId", () => "")', it will return blank string to 'CarbonUpdateUtil.getRequiredFieldFromTID', so ArrayIndexOutOfBoundsException occur. *the plans (logical and physical) for dataFrameWithTupleId :* == Parsed Logical Plan == 'Project [unresolvedalias('stringField3, None), unresolvedalias('intField, None), unresolvedalias('longField, None), unresolvedalias('int2Field, None), unresolvedalias('stringfield1-updatedColumn, None), unresolvedalias('stringfield2-updatedColumn, None), UDF('tupleId) AS segId#286] +- Project [stringField3#113, intField#114, longField#115L, int2Field#116, UDF:getTupleId() AS tupleId#262, concat(stringField1#111, _test) AS stringfield1-updatedColumn#263, concat(stringField2#112, _test) AS stringfield2-updatedColumn#264] +- Filter (isnotnull(stringField3#113) && (stringField3#113 = 1)) +- Relation[stringField1#111,stringField2#112,stringField3#113,intField#114,longField#115L,int2Field#116] CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ] == Analyzed Logical Plan == stringField3: string, intField: int, longField: bigint, int2Field: int, stringfield1-updatedColumn: string, stringfield2-updatedColumn: string, segId: string Project [stringField3#113, intField#114, longField#115L, int2Field#116, stringfield1-updatedColumn#263, stringfield2-updatedColumn#264, UDF(tupleId#262) AS segId#286] +- Project [stringField3#113, intField#114, longField#115L, int2Field#116, UDF:getTupleId() AS tupleId#262, concat(stringField1#111, _test) AS stringfield1-updatedColumn#263, concat(stringField2#112, _test) AS stringfield2-updatedColumn#264] +- Filter (isnotnull(stringField3#113) && (stringField3#113 = 1)) +- Relation[stringField1#111,stringField2#112,stringField3#113,intField#114,longField#115L,int2Field#116] CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ] == Optimized Logical Plan == CarbonDictionaryCatalystDecoder [CarbonDecoderRelation(Map(int2Field#116 -> int2Field#116, longField#115L -> longField#115L, stringField2#112 -> stringField2#112, stringField1#111 -> stringField1#111, stringField3#113 -> stringField3#113, intField#114 -> intField#114),CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ])], ExcludeProfile(ArrayBuffer(stringField2#112, stringField1#111)), CarbonAliasDecoderRelation(), true +- Project [stringField3#113, intField#114, longField#115, int2Field#116, concat(stringField1#111, _test) AS stringfield1-updatedColumn#263, concat(stringField2#112, _test) AS stringfield2-updatedColumn#264, UDF(UDF:getTupleId()) AS segId#286] +- Filter (isnotnull(stringField3#113) && (stringField3#113 = 1)) +- Relation[stringField1#111,stringField2#112,stringField3#113,intField#114,longField#115L,int2Field#116] CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ] == Physical Plan == *CarbonDictionaryDecoder [CarbonDecoderRelation(Map(int2Field#116 -> int2Field#116, longField#115L -> longField#115L, stringField2#112 -> stringField2#112, stringField1#111 -> stringField1#111, stringField3#113 -> stringField3#113, intField#114 -> intField#114),CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ])], ExcludeProfile(ArrayBuffer(stringField2#112, stringField1#111)), CarbonAliasDecoderRelation(), org.apache.spark.sql.CarbonSession@9e4388d +- *Project [stringField3#113, intField#114, longField#115, int2Field#116, concat(stringField1#111, _test) AS stringfield1-updatedColumn#263, concat(stringField2#112, _test) AS stringfield2-updatedColumn#264, UDF(UDF:getTupleId()) AS segId#286] +- *Scan CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ] default.study_carbondata[stringField3#113,intField#114,longField#115,stringField2#112,int2Field#116,stringField1#111] PushedFilters: [IsNotNull(stringField3), EqualTo(stringField3,1)] *My code:* import spark.implicits._ val df1 = spark.sparkContext.parallelize(0 to 50) .map(x => ("a", x.toString(), (x % 2).toString(), x, x.toLong, x * 2)) .toDF("stringField1", "stringField2", "stringField3", "intField", "longField", "int2Field") val df2 = spark.sparkContext.parallelize(51 to 100) .map(x => ("b", x.toString(), (x % 2).toString(), x, x.toLong, x * 2)) .toDF("stringField1", "stringField2", "stringField3", "intField", "longField", "int2Field") val df3 = df1.union(df2) spark.sql("DROP TABLE IF EXISTS study_carbondata ").show() spark.sql(""" | CREATE TABLE IF NOT EXISTS study_carbondata ( | stringField1 string, | stringField2 string, | stringField3 string, | intField int, | longField bigint, | int2Field int | ) | STORED BY 'carbondata' | TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1, stringField2, stringField3, longField', | 'SORT_COLUMNS'='stringField1, stringField2, stringField3, intField', | 'NO_INVERTED_INDEX'='longField', | 'TABLE_BLOCKSIZE'='8' | ) """.stripMargin) val sortScope = "LOCAL_SORT" //GLOBAL_SORT LOCAL_SORT df3.write .format("carbondata") .option("tableName", "study_carbondata") .option("compress", "true") // just valid when tempCSV is true .option("tempCSV", "false") .option("single_pass", "true") .option("sort_scope", sortScope) //GLOBAL_SORT LOCAL_SORT .mode(SaveMode.Append) .save() spark.sql(""" UPDATE study_carbondata a SET (a.stringField1, a.stringField2) = (concat(a.stringField1 , "_test" ), concat(a.stringField2 , "_test" )) WHERE a.stringField3 = '1' """).show(false) Error logs: 2017-09-04 00:39:23,354 - ERROR - org.apache.carbondata.common.logging.impl.StandardLogService.logErrorMessage(StandardLogService.java:143) - main -main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 1 times, most recent failure: Lost task 0.0 in stage 21.0 (TID 27, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$7: (string) => string) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:146) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:350) at scala.collection.Iterator$class.foreach(Iterator.scala:742) at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308) at scala.collection.AbstractIterator.to(Iterator.scala:1194) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287) at scala.collection.AbstractIterator.toArray(Iterator.scala:1194) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.carbondata.core.mutate.CarbonUpdateUtil.getRequiredFieldFromTID(CarbonUpdateUtil.java:67) at org.apache.spark.sql.execution.command.LoadTable$$anonfun$7.apply(carbonTableSchema.scala:866) at org.apache.spark.sql.execution.command.LoadTable$$anonfun$7.apply(carbonTableSchema.scala:865) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:144) ... 26 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.take(RDD.scala:1327) at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1462) at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462) at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462) was: When updating data, if set 'carbon.update.persist.enable'='false', it will fail. I debug code and find that in the method LoadTable.processData the 'dataFrameWithTupleId' will call udf 'getTupleId()' which is defined in CarbonEnv.init(): 'sparkSession.udf.register("getTupleId", () => "")', it will return blank string to 'CarbonUpdateUtil.getRequiredFieldFromTID', so ArrayIndexOutOfBoundsException occur. the plans (logical and physical) for dataFrameWithTupleId : == Parsed Logical Plan == 'Project [unresolvedalias('stringField3, None), unresolvedalias('intField, None), unresolvedalias('longField, None), unresolvedalias('int2Field, None), unresolvedalias('stringfield1-updatedColumn, None), unresolvedalias('stringfield2-updatedColumn, None), UDF('tupleId) AS segId#286] +- Project [stringField3#113, intField#114, longField#115L, int2Field#116, UDF:getTupleId() AS tupleId#262, concat(stringField1#111, _test) AS stringfield1-updatedColumn#263, concat(stringField2#112, _test) AS stringfield2-updatedColumn#264] +- Filter (isnotnull(stringField3#113) && (stringField3#113 = 1)) +- Relation[stringField1#111,stringField2#112,stringField3#113,intField#114,longField#115L,int2Field#116] CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ] == Analyzed Logical Plan == stringField3: string, intField: int, longField: bigint, int2Field: int, stringfield1-updatedColumn: string, stringfield2-updatedColumn: string, segId: string Project [stringField3#113, intField#114, longField#115L, int2Field#116, stringfield1-updatedColumn#263, stringfield2-updatedColumn#264, UDF(tupleId#262) AS segId#286] +- Project [stringField3#113, intField#114, longField#115L, int2Field#116, UDF:getTupleId() AS tupleId#262, concat(stringField1#111, _test) AS stringfield1-updatedColumn#263, concat(stringField2#112, _test) AS stringfield2-updatedColumn#264] +- Filter (isnotnull(stringField3#113) && (stringField3#113 = 1)) +- Relation[stringField1#111,stringField2#112,stringField3#113,intField#114,longField#115L,int2Field#116] CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ] == Optimized Logical Plan == CarbonDictionaryCatalystDecoder [CarbonDecoderRelation(Map(int2Field#116 -> int2Field#116, longField#115L -> longField#115L, stringField2#112 -> stringField2#112, stringField1#111 -> stringField1#111, stringField3#113 -> stringField3#113, intField#114 -> intField#114),CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ])], ExcludeProfile(ArrayBuffer(stringField2#112, stringField1#111)), CarbonAliasDecoderRelation(), true +- Project [stringField3#113, intField#114, longField#115, int2Field#116, concat(stringField1#111, _test) AS stringfield1-updatedColumn#263, concat(stringField2#112, _test) AS stringfield2-updatedColumn#264, UDF(UDF:getTupleId()) AS segId#286] +- Filter (isnotnull(stringField3#113) && (stringField3#113 = 1)) +- Relation[stringField1#111,stringField2#112,stringField3#113,intField#114,longField#115L,int2Field#116] CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ] == Physical Plan == *CarbonDictionaryDecoder [CarbonDecoderRelation(Map(int2Field#116 -> int2Field#116, longField#115L -> longField#115L, stringField2#112 -> stringField2#112, stringField1#111 -> stringField1#111, stringField3#113 -> stringField3#113, intField#114 -> intField#114),CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ])], ExcludeProfile(ArrayBuffer(stringField2#112, stringField1#111)), CarbonAliasDecoderRelation(), org.apache.spark.sql.CarbonSession@9e4388d +- *Project [stringField3#113, intField#114, longField#115, int2Field#116, concat(stringField1#111, _test) AS stringfield1-updatedColumn#263, concat(stringField2#112, _test) AS stringfield2-updatedColumn#264, UDF(UDF:getTupleId()) AS segId#286] +- *Scan CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ] default.study_carbondata[stringField3#113,intField#114,longField#115,stringField2#112,int2Field#116,stringField1#111] PushedFilters: [IsNotNull(stringField3), EqualTo(stringField3,1)] My code: import spark.implicits._ val df1 = spark.sparkContext.parallelize(0 to 50) .map(x => ("a", x.toString(), (x % 2).toString(), x, x.toLong, x * 2)) .toDF("stringField1", "stringField2", "stringField3", "intField", "longField", "int2Field") val df2 = spark.sparkContext.parallelize(51 to 100) .map(x => ("b", x.toString(), (x % 2).toString(), x, x.toLong, x * 2)) .toDF("stringField1", "stringField2", "stringField3", "intField", "longField", "int2Field") val df3 = df1.union(df2) spark.sql("DROP TABLE IF EXISTS study_carbondata ").show() spark.sql(""" | CREATE TABLE IF NOT EXISTS study_carbondata ( | stringField1 string, | stringField2 string, | stringField3 string, | intField int, | longField bigint, | int2Field int | ) | STORED BY 'carbondata' | TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1, stringField2, stringField3, longField', | 'SORT_COLUMNS'='stringField1, stringField2, stringField3, intField', | 'NO_INVERTED_INDEX'='longField', | 'TABLE_BLOCKSIZE'='8' | ) """.stripMargin) val sortScope = "LOCAL_SORT" //GLOBAL_SORT LOCAL_SORT df3.write .format("carbondata") .option("tableName", "study_carbondata") .option("compress", "true") // just valid when tempCSV is true .option("tempCSV", "false") .option("single_pass", "true") .option("sort_scope", sortScope) //GLOBAL_SORT LOCAL_SORT .mode(SaveMode.Append) .save() spark.sql(""" UPDATE study_carbondata a SET (a.stringField1, a.stringField2) = (concat(a.stringField1 , "_test" ), concat(a.stringField2 , "_test" )) WHERE a.stringField3 = '1' """).show(false) Error logs: 2017-09-04 00:39:23,354 - ERROR - org.apache.carbondata.common.logging.impl.StandardLogService.logErrorMessage(StandardLogService.java:143) - main -main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 1 times, most recent failure: Lost task 0.0 in stage 21.0 (TID 27, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$7: (string) => string) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:146) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:350) at scala.collection.Iterator$class.foreach(Iterator.scala:742) at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308) at scala.collection.AbstractIterator.to(Iterator.scala:1194) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287) at scala.collection.AbstractIterator.toArray(Iterator.scala:1194) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.carbondata.core.mutate.CarbonUpdateUtil.getRequiredFieldFromTID(CarbonUpdateUtil.java:67) at org.apache.spark.sql.execution.command.LoadTable$$anonfun$7.apply(carbonTableSchema.scala:866) at org.apache.spark.sql.execution.command.LoadTable$$anonfun$7.apply(carbonTableSchema.scala:865) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:144) ... 26 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.take(RDD.scala:1327) at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1462) at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462) at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462) > if 'carbon.update.persist.enable'='false', it will fail to update data > ----------------------------------------------------------------------- > > Key: CARBONDATA-1445 > URL: https://issues.apache.org/jira/browse/CARBONDATA-1445 > Project: CarbonData > Issue Type: Bug > Components: data-load, spark-integration, sql > Affects Versions: 1.2.0 > Environment: CarbonData master branch, Spark 2.1.1 > Reporter: Zhichao Zhang > Priority: Minor > > When updating data, if set 'carbon.update.persist.enable'='false', it will fail. > I debug code and find that in the method LoadTable.processData the 'dataFrameWithTupleId' will call udf 'getTupleId()' which is defined in CarbonEnv.init(): 'sparkSession.udf.register("getTupleId", () => "")', it will return blank string to 'CarbonUpdateUtil.getRequiredFieldFromTID', so ArrayIndexOutOfBoundsException occur. > *the plans (logical and physical) for dataFrameWithTupleId :* > == Parsed Logical Plan == > 'Project [unresolvedalias('stringField3, None), unresolvedalias('intField, None), unresolvedalias('longField, None), unresolvedalias('int2Field, None), unresolvedalias('stringfield1-updatedColumn, None), unresolvedalias('stringfield2-updatedColumn, None), UDF('tupleId) AS segId#286] > +- Project [stringField3#113, intField#114, longField#115L, int2Field#116, UDF:getTupleId() AS tupleId#262, concat(stringField1#111, _test) AS stringfield1-updatedColumn#263, concat(stringField2#112, _test) AS stringfield2-updatedColumn#264] > +- Filter (isnotnull(stringField3#113) && (stringField3#113 = 1)) > +- Relation[stringField1#111,stringField2#112,stringField3#113,intField#114,longField#115L,int2Field#116] CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ] > == Analyzed Logical Plan == > stringField3: string, intField: int, longField: bigint, int2Field: int, stringfield1-updatedColumn: string, stringfield2-updatedColumn: string, segId: string > Project [stringField3#113, intField#114, longField#115L, int2Field#116, stringfield1-updatedColumn#263, stringfield2-updatedColumn#264, UDF(tupleId#262) AS segId#286] > +- Project [stringField3#113, intField#114, longField#115L, int2Field#116, UDF:getTupleId() AS tupleId#262, concat(stringField1#111, _test) AS stringfield1-updatedColumn#263, concat(stringField2#112, _test) AS stringfield2-updatedColumn#264] > +- Filter (isnotnull(stringField3#113) && (stringField3#113 = 1)) > +- Relation[stringField1#111,stringField2#112,stringField3#113,intField#114,longField#115L,int2Field#116] CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ] > == Optimized Logical Plan == > CarbonDictionaryCatalystDecoder [CarbonDecoderRelation(Map(int2Field#116 -> int2Field#116, longField#115L -> longField#115L, stringField2#112 -> stringField2#112, stringField1#111 -> stringField1#111, stringField3#113 -> stringField3#113, intField#114 -> intField#114),CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ])], ExcludeProfile(ArrayBuffer(stringField2#112, stringField1#111)), CarbonAliasDecoderRelation(), true > +- Project [stringField3#113, intField#114, longField#115, int2Field#116, concat(stringField1#111, _test) AS stringfield1-updatedColumn#263, concat(stringField2#112, _test) AS stringfield2-updatedColumn#264, UDF(UDF:getTupleId()) AS segId#286] > +- Filter (isnotnull(stringField3#113) && (stringField3#113 = 1)) > +- Relation[stringField1#111,stringField2#112,stringField3#113,intField#114,longField#115L,int2Field#116] CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ] > == Physical Plan == > *CarbonDictionaryDecoder [CarbonDecoderRelation(Map(int2Field#116 -> int2Field#116, longField#115L -> longField#115L, stringField2#112 -> stringField2#112, stringField1#111 -> stringField1#111, stringField3#113 -> stringField3#113, intField#114 -> intField#114),CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ])], ExcludeProfile(ArrayBuffer(stringField2#112, stringField1#111)), CarbonAliasDecoderRelation(), org.apache.spark.sql.CarbonSession@9e4388d > +- *Project [stringField3#113, intField#114, longField#115, int2Field#116, concat(stringField1#111, _test) AS stringfield1-updatedColumn#263, concat(stringField2#112, _test) AS stringfield2-updatedColumn#264, UDF(UDF:getTupleId()) AS segId#286] > +- *Scan CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ] default.study_carbondata[stringField3#113,intField#114,longField#115,stringField2#112,int2Field#116,stringField1#111] PushedFilters: [IsNotNull(stringField3), EqualTo(stringField3,1)] > *My code:* > import spark.implicits._ > val df1 = spark.sparkContext.parallelize(0 to 50) > .map(x => ("a", x.toString(), (x % 2).toString(), x, x.toLong, x * 2)) > .toDF("stringField1", "stringField2", "stringField3", "intField", "longField", "int2Field") > > val df2 = spark.sparkContext.parallelize(51 to 100) > .map(x => ("b", x.toString(), (x % 2).toString(), x, x.toLong, x * 2)) > .toDF("stringField1", "stringField2", "stringField3", "intField", "longField", "int2Field") > > val df3 = df1.union(df2) > spark.sql("DROP TABLE IF EXISTS study_carbondata ").show() > spark.sql(""" > | CREATE TABLE IF NOT EXISTS study_carbondata ( > | stringField1 string, > | stringField2 string, > | stringField3 string, > | intField int, > | longField bigint, > | int2Field int > | ) > | STORED BY 'carbondata' > | TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1, stringField2, stringField3, longField', > | 'SORT_COLUMNS'='stringField1, stringField2, stringField3, intField', > | 'NO_INVERTED_INDEX'='longField', > | 'TABLE_BLOCKSIZE'='8' > | ) > """.stripMargin) > val sortScope = "LOCAL_SORT" //GLOBAL_SORT LOCAL_SORT > df3.write > .format("carbondata") > .option("tableName", "study_carbondata") > .option("compress", "true") // just valid when tempCSV is true > .option("tempCSV", "false") > .option("single_pass", "true") > .option("sort_scope", sortScope) //GLOBAL_SORT LOCAL_SORT > .mode(SaveMode.Append) > .save() > spark.sql(""" > UPDATE study_carbondata a > SET (a.stringField1, a.stringField2) = (concat(a.stringField1 , "_test" ), concat(a.stringField2 , "_test" )) > WHERE a.stringField3 = '1' > """).show(false) > Error logs: > 2017-09-04 00:39:23,354 - ERROR - org.apache.carbondata.common.logging.impl.StandardLogService.logErrorMessage(StandardLogService.java:143) - main -main > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 1 times, most recent failure: Lost task 0.0 in stage 21.0 (TID 27, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$7: (string) => string) > at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:146) > at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:350) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) > at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308) > at scala.collection.AbstractIterator.to(Iterator.scala:1194) > at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194) > at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1194) > at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354) > at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354) > at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) > at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ArrayIndexOutOfBoundsException: 1 > at org.apache.carbondata.core.mutate.CarbonUpdateUtil.getRequiredFieldFromTID(CarbonUpdateUtil.java:67) > at org.apache.spark.sql.execution.command.LoadTable$$anonfun$7.apply(carbonTableSchema.scala:866) > at org.apache.spark.sql.execution.command.LoadTable$$anonfun$7.apply(carbonTableSchema.scala:865) > at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:144) > ... 26 more > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) > at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) > at scala.Option.foreach(Option.scala:257) > at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) > at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354) > at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at org.apache.spark.rdd.RDD.take(RDD.scala:1327) > at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1462) > at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462) > at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462) -- This message was sent by Atlassian JIRA (v6.4.14#64029)