carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Zhichao Zhang (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (CARBONDATA-1445) if 'carbon.update.persist.enable'='false', it will fail to update data
Date Sun, 03 Sep 2017 16:54:00 GMT

     [ https://issues.apache.org/jira/browse/CARBONDATA-1445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Zhichao  Zhang updated CARBONDATA-1445:
---------------------------------------
    Description: 
When updating data, if set 'carbon.update.persist.enable'='false', it will fail.
I debug code and find that in the method LoadTable.processData the 'dataFrameWithTupleId'
will call udf 'getTupleId()' which is defined in CarbonEnv.init(): 'sparkSession.udf.register("getTupleId",
() => "")', it will return blank string to 'CarbonUpdateUtil.getRequiredFieldFromTID',
so ArrayIndexOutOfBoundsException occur.

*the plans (logical and physical) for dataFrameWithTupleId :*
== Parsed Logical Plan ==
'Project [unresolvedalias('stringField3, None), unresolvedalias('intField, None), unresolvedalias('longField,
None), unresolvedalias('int2Field, None), unresolvedalias('stringfield1-updatedColumn, None),
unresolvedalias('stringfield2-updatedColumn, None), UDF('tupleId) AS segId#286]
+- Project [stringField3#113, intField#114, longField#115L, int2Field#116, UDF:getTupleId()
AS tupleId#262, concat(stringField1#111, _test) AS stringfield1-updatedColumn#263, concat(stringField2#112,
_test) AS stringfield2-updatedColumn#264]
   +- Filter (isnotnull(stringField3#113) && (stringField3#113 = 1))
      +- Relation[stringField1#111,stringField2#112,stringField3#113,intField#114,longField#115L,int2Field#116]
CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema
:Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true),
StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true),
StructField(int2Field,IntegerType,true))) ]

== Analyzed Logical Plan ==
stringField3: string, intField: int, longField: bigint, int2Field: int, stringfield1-updatedColumn:
string, stringfield2-updatedColumn: string, segId: string
Project [stringField3#113, intField#114, longField#115L, int2Field#116, stringfield1-updatedColumn#263,
stringfield2-updatedColumn#264, UDF(tupleId#262) AS segId#286]
+- Project [stringField3#113, intField#114, longField#115L, int2Field#116, UDF:getTupleId()
AS tupleId#262, concat(stringField1#111, _test) AS stringfield1-updatedColumn#263, concat(stringField2#112,
_test) AS stringfield2-updatedColumn#264]
   +- Filter (isnotnull(stringField3#113) && (stringField3#113 = 1))
      +- Relation[stringField1#111,stringField2#112,stringField3#113,intField#114,longField#115L,int2Field#116]
CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema
:Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true),
StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true),
StructField(int2Field,IntegerType,true))) ]

== Optimized Logical Plan ==
CarbonDictionaryCatalystDecoder [CarbonDecoderRelation(Map(int2Field#116 -> int2Field#116,
longField#115L -> longField#115L, stringField2#112 -> stringField2#112, stringField1#111
-> stringField1#111, stringField3#113 -> stringField3#113, intField#114 -> intField#114),CarbonDatasourceHadoopRelation
[ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true),
StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true),
StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ])], ExcludeProfile(ArrayBuffer(stringField2#112,
stringField1#111)), CarbonAliasDecoderRelation(), true
+- Project [stringField3#113, intField#114, longField#115, int2Field#116, concat(stringField1#111,
_test) AS stringfield1-updatedColumn#263, concat(stringField2#112, _test) AS stringfield2-updatedColumn#264,
UDF(UDF:getTupleId()) AS segId#286]
   +- Filter (isnotnull(stringField3#113) && (stringField3#113 = 1))
      +- Relation[stringField1#111,stringField2#112,stringField3#113,intField#114,longField#115L,int2Field#116]
CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema
:Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true),
StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true),
StructField(int2Field,IntegerType,true))) ]

== Physical Plan ==
*CarbonDictionaryDecoder [CarbonDecoderRelation(Map(int2Field#116 -> int2Field#116, longField#115L
-> longField#115L, stringField2#112 -> stringField2#112, stringField1#111 -> stringField1#111,
stringField3#113 -> stringField3#113, intField#114 -> intField#114),CarbonDatasourceHadoopRelation
[ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true),
StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true),
StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ])], ExcludeProfile(ArrayBuffer(stringField2#112,
stringField1#111)), CarbonAliasDecoderRelation(), org.apache.spark.sql.CarbonSession@9e4388d
+- *Project [stringField3#113, intField#114, longField#115, int2Field#116, concat(stringField1#111,
_test) AS stringfield1-updatedColumn#263, concat(stringField2#112, _test) AS stringfield2-updatedColumn#264,
UDF(UDF:getTupleId()) AS segId#286]
   +- *Scan CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata,
Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true),
StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true),
StructField(int2Field,IntegerType,true))) ] default.study_carbondata[stringField3#113,intField#114,longField#115,stringField2#112,int2Field#116,stringField1#111]
PushedFilters: [IsNotNull(stringField3), EqualTo(stringField3,1)]

*My code:*
import spark.implicits._
val df1 = spark.sparkContext.parallelize(0 to 50)
  .map(x => ("a", x.toString(), (x % 2).toString(), x, x.toLong, x * 2))
  .toDF("stringField1", "stringField2", "stringField3", "intField", "longField", "int2Field")
  
val df2 = spark.sparkContext.parallelize(51 to 100)
  .map(x => ("b", x.toString(), (x % 2).toString(), x, x.toLong, x * 2))
  .toDF("stringField1", "stringField2", "stringField3", "intField", "longField", "int2Field")
 
val df3 = df1.union(df2)
spark.sql("DROP TABLE IF EXISTS study_carbondata ").show()
spark.sql("""
    |  CREATE TABLE IF NOT EXISTS study_carbondata (
    |    stringField1          string,
    |    stringField2          string, 
    |    stringField3          string, 
    |    intField              int, 
    |    longField             bigint,
    |    int2Field             int 
    |  )
    |  STORED BY 'carbondata'
    |  TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1, stringField2, stringField3, longField',
    |    'SORT_COLUMNS'='stringField1, stringField2, stringField3, intField',
    |    'NO_INVERTED_INDEX'='longField',
    |    'TABLE_BLOCKSIZE'='8'
    |  )
   """.stripMargin)
   val sortScope = "LOCAL_SORT"  //GLOBAL_SORT  LOCAL_SORT
df3.write
  .format("carbondata")
  .option("tableName", "study_carbondata")
  .option("compress", "true")  // just valid when tempCSV is true
  .option("tempCSV", "false")
  .option("single_pass", "true") 
  .option("sort_scope", sortScope) //GLOBAL_SORT  LOCAL_SORT
  .mode(SaveMode.Append)
  .save()
spark.sql("""
          UPDATE study_carbondata a 
              SET (a.stringField1, a.stringField2) = (concat(a.stringField1 , "_test" ), concat(a.stringField2
, "_test" ))
          WHERE a.stringField3 = '1'
          """).show(false)

Error logs:
2017-09-04 00:39:23,354 - ERROR - org.apache.carbondata.common.logging.impl.StandardLogService.logErrorMessage(StandardLogService.java:143)
- main -main
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed
1 times, most recent failure: Lost task 0.0 in stage 21.0 (TID 27, localhost, executor driver):
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$7: (string)
=> string)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:146)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:350)
        at scala.collection.Iterator$class.foreach(Iterator.scala:742)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
        at scala.collection.AbstractIterator.to(Iterator.scala:1194)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
        at org.apache.carbondata.core.mutate.CarbonUpdateUtil.getRequiredFieldFromTID(CarbonUpdateUtil.java:67)
        at org.apache.spark.sql.execution.command.LoadTable$$anonfun$7.apply(carbonTableSchema.scala:866)
        at org.apache.spark.sql.execution.command.LoadTable$$anonfun$7.apply(carbonTableSchema.scala:865)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:144)
        ... 26 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
        at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1462)
        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462)
        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462)



  was:
When updating data, if set 'carbon.update.persist.enable'='false', it will fail.
I debug code and find that in the method LoadTable.processData the 'dataFrameWithTupleId'
will call udf 'getTupleId()' which is defined in CarbonEnv.init(): 'sparkSession.udf.register("getTupleId",
() => "")', it will return blank string to 'CarbonUpdateUtil.getRequiredFieldFromTID',
so ArrayIndexOutOfBoundsException occur.

the plans (logical and physical) for dataFrameWithTupleId :
== Parsed Logical Plan ==
'Project [unresolvedalias('stringField3, None), unresolvedalias('intField, None), unresolvedalias('longField,
None), unresolvedalias('int2Field, None), unresolvedalias('stringfield1-updatedColumn, None),
unresolvedalias('stringfield2-updatedColumn, None), UDF('tupleId) AS segId#286]
+- Project [stringField3#113, intField#114, longField#115L, int2Field#116, UDF:getTupleId()
AS tupleId#262, concat(stringField1#111, _test) AS stringfield1-updatedColumn#263, concat(stringField2#112,
_test) AS stringfield2-updatedColumn#264]
   +- Filter (isnotnull(stringField3#113) && (stringField3#113 = 1))
      +- Relation[stringField1#111,stringField2#112,stringField3#113,intField#114,longField#115L,int2Field#116]
CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema
:Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true),
StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true),
StructField(int2Field,IntegerType,true))) ]

== Analyzed Logical Plan ==
stringField3: string, intField: int, longField: bigint, int2Field: int, stringfield1-updatedColumn:
string, stringfield2-updatedColumn: string, segId: string
Project [stringField3#113, intField#114, longField#115L, int2Field#116, stringfield1-updatedColumn#263,
stringfield2-updatedColumn#264, UDF(tupleId#262) AS segId#286]
+- Project [stringField3#113, intField#114, longField#115L, int2Field#116, UDF:getTupleId()
AS tupleId#262, concat(stringField1#111, _test) AS stringfield1-updatedColumn#263, concat(stringField2#112,
_test) AS stringfield2-updatedColumn#264]
   +- Filter (isnotnull(stringField3#113) && (stringField3#113 = 1))
      +- Relation[stringField1#111,stringField2#112,stringField3#113,intField#114,longField#115L,int2Field#116]
CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema
:Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true),
StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true),
StructField(int2Field,IntegerType,true))) ]

== Optimized Logical Plan ==
CarbonDictionaryCatalystDecoder [CarbonDecoderRelation(Map(int2Field#116 -> int2Field#116,
longField#115L -> longField#115L, stringField2#112 -> stringField2#112, stringField1#111
-> stringField1#111, stringField3#113 -> stringField3#113, intField#114 -> intField#114),CarbonDatasourceHadoopRelation
[ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true),
StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true),
StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ])], ExcludeProfile(ArrayBuffer(stringField2#112,
stringField1#111)), CarbonAliasDecoderRelation(), true
+- Project [stringField3#113, intField#114, longField#115, int2Field#116, concat(stringField1#111,
_test) AS stringfield1-updatedColumn#263, concat(stringField2#112, _test) AS stringfield2-updatedColumn#264,
UDF(UDF:getTupleId()) AS segId#286]
   +- Filter (isnotnull(stringField3#113) && (stringField3#113 = 1))
      +- Relation[stringField1#111,stringField2#112,stringField3#113,intField#114,longField#115L,int2Field#116]
CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema
:Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true),
StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true),
StructField(int2Field,IntegerType,true))) ]

== Physical Plan ==
*CarbonDictionaryDecoder [CarbonDecoderRelation(Map(int2Field#116 -> int2Field#116, longField#115L
-> longField#115L, stringField2#112 -> stringField2#112, stringField1#111 -> stringField1#111,
stringField3#113 -> stringField3#113, intField#114 -> intField#114),CarbonDatasourceHadoopRelation
[ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true),
StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true),
StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ])], ExcludeProfile(ArrayBuffer(stringField2#112,
stringField1#111)), CarbonAliasDecoderRelation(), org.apache.spark.sql.CarbonSession@9e4388d
+- *Project [stringField3#113, intField#114, longField#115, int2Field#116, concat(stringField1#111,
_test) AS stringfield1-updatedColumn#263, concat(stringField2#112, _test) AS stringfield2-updatedColumn#264,
UDF(UDF:getTupleId()) AS segId#286]
   +- *Scan CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata,
Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true),
StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true),
StructField(int2Field,IntegerType,true))) ] default.study_carbondata[stringField3#113,intField#114,longField#115,stringField2#112,int2Field#116,stringField1#111]
PushedFilters: [IsNotNull(stringField3), EqualTo(stringField3,1)]

My code:
import spark.implicits._
val df1 = spark.sparkContext.parallelize(0 to 50)
  .map(x => ("a", x.toString(), (x % 2).toString(), x, x.toLong, x * 2))
  .toDF("stringField1", "stringField2", "stringField3", "intField", "longField", "int2Field")
  
val df2 = spark.sparkContext.parallelize(51 to 100)
  .map(x => ("b", x.toString(), (x % 2).toString(), x, x.toLong, x * 2))
  .toDF("stringField1", "stringField2", "stringField3", "intField", "longField", "int2Field")
 
val df3 = df1.union(df2)
spark.sql("DROP TABLE IF EXISTS study_carbondata ").show()
spark.sql("""
    |  CREATE TABLE IF NOT EXISTS study_carbondata (
    |    stringField1          string,
    |    stringField2          string, 
    |    stringField3          string, 
    |    intField              int, 
    |    longField             bigint,
    |    int2Field             int 
    |  )
    |  STORED BY 'carbondata'
    |  TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1, stringField2, stringField3, longField',
    |    'SORT_COLUMNS'='stringField1, stringField2, stringField3, intField',
    |    'NO_INVERTED_INDEX'='longField',
    |    'TABLE_BLOCKSIZE'='8'
    |  )
   """.stripMargin)
   val sortScope = "LOCAL_SORT"  //GLOBAL_SORT  LOCAL_SORT
df3.write
  .format("carbondata")
  .option("tableName", "study_carbondata")
  .option("compress", "true")  // just valid when tempCSV is true
  .option("tempCSV", "false")
  .option("single_pass", "true") 
  .option("sort_scope", sortScope) //GLOBAL_SORT  LOCAL_SORT
  .mode(SaveMode.Append)
  .save()
spark.sql("""
          UPDATE study_carbondata a 
              SET (a.stringField1, a.stringField2) = (concat(a.stringField1 , "_test" ), concat(a.stringField2
, "_test" ))
          WHERE a.stringField3 = '1'
          """).show(false)

Error logs:
2017-09-04 00:39:23,354 - ERROR - org.apache.carbondata.common.logging.impl.StandardLogService.logErrorMessage(StandardLogService.java:143)
- main -main
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed
1 times, most recent failure: Lost task 0.0 in stage 21.0 (TID 27, localhost, executor driver):
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$7: (string)
=> string)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:146)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:350)
        at scala.collection.Iterator$class.foreach(Iterator.scala:742)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
        at scala.collection.AbstractIterator.to(Iterator.scala:1194)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
        at org.apache.carbondata.core.mutate.CarbonUpdateUtil.getRequiredFieldFromTID(CarbonUpdateUtil.java:67)
        at org.apache.spark.sql.execution.command.LoadTable$$anonfun$7.apply(carbonTableSchema.scala:866)
        at org.apache.spark.sql.execution.command.LoadTable$$anonfun$7.apply(carbonTableSchema.scala:865)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:144)
        ... 26 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
        at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1462)
        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462)
        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462)




> if 'carbon.update.persist.enable'='false', it will fail to update data 
> -----------------------------------------------------------------------
>
>                 Key: CARBONDATA-1445
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-1445
>             Project: CarbonData
>          Issue Type: Bug
>          Components: data-load, spark-integration, sql
>    Affects Versions: 1.2.0
>         Environment: CarbonData master branch, Spark 2.1.1
>            Reporter: Zhichao  Zhang
>            Priority: Minor
>
> When updating data, if set 'carbon.update.persist.enable'='false', it will fail.
> I debug code and find that in the method LoadTable.processData the 'dataFrameWithTupleId'
will call udf 'getTupleId()' which is defined in CarbonEnv.init(): 'sparkSession.udf.register("getTupleId",
() => "")', it will return blank string to 'CarbonUpdateUtil.getRequiredFieldFromTID',
so ArrayIndexOutOfBoundsException occur.
> *the plans (logical and physical) for dataFrameWithTupleId :*
> == Parsed Logical Plan ==
> 'Project [unresolvedalias('stringField3, None), unresolvedalias('intField, None), unresolvedalias('longField,
None), unresolvedalias('int2Field, None), unresolvedalias('stringfield1-updatedColumn, None),
unresolvedalias('stringfield2-updatedColumn, None), UDF('tupleId) AS segId#286]
> +- Project [stringField3#113, intField#114, longField#115L, int2Field#116, UDF:getTupleId()
AS tupleId#262, concat(stringField1#111, _test) AS stringfield1-updatedColumn#263, concat(stringField2#112,
_test) AS stringfield2-updatedColumn#264]
>    +- Filter (isnotnull(stringField3#113) && (stringField3#113 = 1))
>       +- Relation[stringField1#111,stringField2#112,stringField3#113,intField#114,longField#115L,int2Field#116]
CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema
:Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true),
StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true),
StructField(int2Field,IntegerType,true))) ]
> == Analyzed Logical Plan ==
> stringField3: string, intField: int, longField: bigint, int2Field: int, stringfield1-updatedColumn:
string, stringfield2-updatedColumn: string, segId: string
> Project [stringField3#113, intField#114, longField#115L, int2Field#116, stringfield1-updatedColumn#263,
stringfield2-updatedColumn#264, UDF(tupleId#262) AS segId#286]
> +- Project [stringField3#113, intField#114, longField#115L, int2Field#116, UDF:getTupleId()
AS tupleId#262, concat(stringField1#111, _test) AS stringfield1-updatedColumn#263, concat(stringField2#112,
_test) AS stringfield2-updatedColumn#264]
>    +- Filter (isnotnull(stringField3#113) && (stringField3#113 = 1))
>       +- Relation[stringField1#111,stringField2#112,stringField3#113,intField#114,longField#115L,int2Field#116]
CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema
:Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true),
StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true),
StructField(int2Field,IntegerType,true))) ]
> == Optimized Logical Plan ==
> CarbonDictionaryCatalystDecoder [CarbonDecoderRelation(Map(int2Field#116 -> int2Field#116,
longField#115L -> longField#115L, stringField2#112 -> stringField2#112, stringField1#111
-> stringField1#111, stringField3#113 -> stringField3#113, intField#114 -> intField#114),CarbonDatasourceHadoopRelation
[ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true),
StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true),
StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ])], ExcludeProfile(ArrayBuffer(stringField2#112,
stringField1#111)), CarbonAliasDecoderRelation(), true
> +- Project [stringField3#113, intField#114, longField#115, int2Field#116, concat(stringField1#111,
_test) AS stringfield1-updatedColumn#263, concat(stringField2#112, _test) AS stringfield2-updatedColumn#264,
UDF(UDF:getTupleId()) AS segId#286]
>    +- Filter (isnotnull(stringField3#113) && (stringField3#113 = 1))
>       +- Relation[stringField1#111,stringField2#112,stringField3#113,intField#114,longField#115L,int2Field#116]
CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata, Schema
:Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true),
StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true),
StructField(int2Field,IntegerType,true))) ]
> == Physical Plan ==
> *CarbonDictionaryDecoder [CarbonDecoderRelation(Map(int2Field#116 -> int2Field#116,
longField#115L -> longField#115L, stringField2#112 -> stringField2#112, stringField1#111
-> stringField1#111, stringField3#113 -> stringField3#113, intField#114 -> intField#114),CarbonDatasourceHadoopRelation
[ Database name :default, Table name :study_carbondata, Schema :Some(StructType(StructField(stringField1,StringType,true),
StructField(stringField2,StringType,true), StructField(stringField3,StringType,true), StructField(intField,IntegerType,true),
StructField(longField,LongType,true), StructField(int2Field,IntegerType,true))) ])], ExcludeProfile(ArrayBuffer(stringField2#112,
stringField1#111)), CarbonAliasDecoderRelation(), org.apache.spark.sql.CarbonSession@9e4388d
> +- *Project [stringField3#113, intField#114, longField#115, int2Field#116, concat(stringField1#111,
_test) AS stringfield1-updatedColumn#263, concat(stringField2#112, _test) AS stringfield2-updatedColumn#264,
UDF(UDF:getTupleId()) AS segId#286]
>    +- *Scan CarbonDatasourceHadoopRelation [ Database name :default, Table name :study_carbondata,
Schema :Some(StructType(StructField(stringField1,StringType,true), StructField(stringField2,StringType,true),
StructField(stringField3,StringType,true), StructField(intField,IntegerType,true), StructField(longField,LongType,true),
StructField(int2Field,IntegerType,true))) ] default.study_carbondata[stringField3#113,intField#114,longField#115,stringField2#112,int2Field#116,stringField1#111]
PushedFilters: [IsNotNull(stringField3), EqualTo(stringField3,1)]
> *My code:*
> import spark.implicits._
> val df1 = spark.sparkContext.parallelize(0 to 50)
>   .map(x => ("a", x.toString(), (x % 2).toString(), x, x.toLong, x * 2))
>   .toDF("stringField1", "stringField2", "stringField3", "intField", "longField", "int2Field")
>   
> val df2 = spark.sparkContext.parallelize(51 to 100)
>   .map(x => ("b", x.toString(), (x % 2).toString(), x, x.toLong, x * 2))
>   .toDF("stringField1", "stringField2", "stringField3", "intField", "longField", "int2Field")
>  
> val df3 = df1.union(df2)
> spark.sql("DROP TABLE IF EXISTS study_carbondata ").show()
> spark.sql("""
>     |  CREATE TABLE IF NOT EXISTS study_carbondata (
>     |    stringField1          string,
>     |    stringField2          string, 
>     |    stringField3          string, 
>     |    intField              int, 
>     |    longField             bigint,
>     |    int2Field             int 
>     |  )
>     |  STORED BY 'carbondata'
>     |  TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1, stringField2, stringField3,
longField',
>     |    'SORT_COLUMNS'='stringField1, stringField2, stringField3, intField',
>     |    'NO_INVERTED_INDEX'='longField',
>     |    'TABLE_BLOCKSIZE'='8'
>     |  )
>    """.stripMargin)
>    val sortScope = "LOCAL_SORT"  //GLOBAL_SORT  LOCAL_SORT
> df3.write
>   .format("carbondata")
>   .option("tableName", "study_carbondata")
>   .option("compress", "true")  // just valid when tempCSV is true
>   .option("tempCSV", "false")
>   .option("single_pass", "true") 
>   .option("sort_scope", sortScope) //GLOBAL_SORT  LOCAL_SORT
>   .mode(SaveMode.Append)
>   .save()
> spark.sql("""
>           UPDATE study_carbondata a 
>               SET (a.stringField1, a.stringField2) = (concat(a.stringField1 , "_test"
), concat(a.stringField2 , "_test" ))
>           WHERE a.stringField3 = '1'
>           """).show(false)
> Error logs:
> 2017-09-04 00:39:23,354 - ERROR - org.apache.carbondata.common.logging.impl.StandardLogService.logErrorMessage(StandardLogService.java:143)
- main -main
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0
failed 1 times, most recent failure: Lost task 0.0 in stage 21.0 (TID 27, localhost, executor
driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$7:
(string) => string)
>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:146)
>         at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>         at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
>         at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:350)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:742)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
>         at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
>         at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
>         at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
>         at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
>         at scala.collection.AbstractIterator.to(Iterator.scala:1194)
>         at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
>         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
>         at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
>         at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
>         at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
>         at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
>         at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
>         at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>         at org.apache.spark.scheduler.Task.run(Task.scala:99)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
>         at org.apache.carbondata.core.mutate.CarbonUpdateUtil.getRequiredFieldFromTID(CarbonUpdateUtil.java:67)
>         at org.apache.spark.sql.execution.command.LoadTable$$anonfun$7.apply(carbonTableSchema.scala:866)
>         at org.apache.spark.sql.execution.command.LoadTable$$anonfun$7.apply(carbonTableSchema.scala:865)
>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:144)
>         ... 26 more
> Driver stacktrace:
>         at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>         at scala.Option.foreach(Option.scala:257)
>         at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
>         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
>         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
>         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>         at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
>         at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>         at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
>         at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1462)
>         at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462)
>         at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Mime
View raw message