carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzcclp <...@git.apache.org>
Subject [GitHub] carbondata pull request #1266: [CARBONDATA-1393] Avoid to throw NPE when exe...
Date Mon, 21 Aug 2017 08:11:31 GMT
Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1266#discussion_r134168452
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
---
    @@ -105,13 +105,15 @@ private synchronized MemoryBlock allocateMemory(long taskId, long
memoryRequeste
       }
     
       public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) {
    -    taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock);
    -    allocator.free(memoryBlock);
    -    memoryUsed -= memoryBlock.size();
    -    memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
    -    LOGGER.info(
    -        "Freeing memory of size: " + memoryBlock.size() + "available memory:  " + (totalMemory
    -            - memoryUsed));
    +    if (taskIdToMemoryBlockMap.containsKey(taskId)) {
    --- End diff --
    
    @jackylk , @ravipesala  I ran the following code in my IDE:
    
    ```
    spark.sql("""
            |  CREATE TABLE IF NOT EXISTS study_carbondata (
            |    stringField1          string,
            |    stringField2          string, 
            |    stringField3          string, 
            |    intField              int, 
            |    longField             bigint,
            |    int2Field             int 
            |  )
            |  STORED BY 'carbondata'
            |  TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1, stringField2, stringField3,
longField',
            |    'SORT_COLUMNS'='stringField1, stringField2, stringField3, intField',
            |    'NO_INVERTED_INDEX'='longField',
            |    'TABLE_BLOCKSIZE'='8'
            |  )
           """.stripMargin)
    df3.write
          .format("carbondata")
          .option("tableName", "study_carbondata")
          .option("compress", "true")  // just valid when tempCSV is true
          .option("tempCSV", "false")
          .option("single_pass", "true") 
          .option("sort_scope", "LOCAL_SORT") //GLOBAL_SORT  LOCAL_SORT
          .mode(SaveMode.Append)
          .save()
    df3.write
          .format("carbondata")
          .option("tableName", "study_carbondata")
          .option("compress", "true")  // just valid when tempCSV is true
          .option("tempCSV", "false")
          .option("single_pass", "true") 
          .option("sort_scope", "LOCAL_SORT") //GLOBAL_SORT  LOCAL_SORT
          .mode(SaveMode.Append)
          .save()
    
    ```
    **the configs in carbon.properties:**
    
    ```
    carbon.enable.auto.load.merge=true
    carbon.compaction.level.threshold=2,4
    carbon.major.compaction.size=1024
    carbon.number.of.cores.while.loading=2
    carbon.number.of.cores.while.compacting=2
    enable.unsafe.columnpage=true
    enable.unsafe.in.query.processing=true
    enable.unsafe.sort=true
    carbon.global.sort.rdd.storage.level=MEMORY_AND_DISK_SER
    ```
    
    I added some log in the method 'UnsafeMemoryManager.freeMemory' and method 'UnsafeMemoryManager.freeMemoryAll',
and found that:
    
    2017-08-21 15:42:25,240 - [Executor task launch worker for task 12][partitionID:carbondata;queryID:21481913656922]
**contains Key: true 21481895071366** 
    2017-08-21 15:42:25,241 - [Executor task launch worker for task 12][partitionID:carbondata;queryID:21481913656922]
**remove Key: 21481895071366** 
    2017-08-21 15:42:25,245 - [Executor task launch worker for task 12][partitionID:carbondata;queryID:21481913656922]
**contains Key: false 21481895071366** 
    
    **taskId '21481895071366' had been removed before getting from 'taskIdToMemoryBlockMap',
so the NPE was occurred:**
    
    ```
    Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 9.0 (TID 12, localhost, executor driver): java.lang.NullPointerException
            at org.apache.carbondata.core.memory.UnsafeMemoryManager.freeMemory(UnsafeMemoryManager.java:109)
            at org.apache.carbondata.core.datastore.page.UnsafeFixLengthColumnPage.freeMemory(UnsafeFixLengthColumnPage.java:317)
            at org.apache.carbondata.core.datastore.page.LazyColumnPage.freeMemory(LazyColumnPage.java:202)
            at org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk.freeMemory(MeasureColumnDataChunk.java:70)
            at org.apache.carbondata.core.scan.result.AbstractScannedResult.freeMemory(AbstractScannedResult.java:517)
            at org.apache.carbondata.core.scan.processor.AbstractDataBlockIterator.close(AbstractDataBlockIterator.java:228)
            at org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator.close(AbstractDetailQueryResultIterator.java:306)
            at org.apache.carbondata.core.scan.executor.impl.AbstractQueryExecutor.finish(AbstractQueryExecutor.java:544)
            at org.apache.carbondata.processing.merger.CarbonCompactionExecutor.finish(CarbonCompactionExecutor.java:173)
            at org.apache.carbondata.spark.rdd.CarbonMergerRDD$$anon$1.<init>(CarbonMergerRDD.scala:242)
            at org.apache.carbondata.spark.rdd.CarbonMergerRDD.internalCompute(CarbonMergerRDD.scala:79)
            at org.apache.carbondata.spark.rdd.CarbonRDD.compute(CarbonRDD.scala:61)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
            at org.apache.spark.scheduler.Task.run(Task.scala:99)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
    
    Driver stacktrace:
    java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Job aborted
due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0
in stage 9.0 (TID 12, localhost, executor driver): java.lang.NullPointerException
            at org.apache.carbondata.core.memory.UnsafeMemoryManager.freeMemory(UnsafeMemoryManager.java:109)
            at org.apache.carbondata.core.datastore.page.UnsafeFixLengthColumnPage.freeMemory(UnsafeFixLengthColumnPage.java:317)
            at org.apache.carbondata.core.datastore.page.LazyColumnPage.freeMemory(LazyColumnPage.java:202)
            at org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk.freeMemory(MeasureColumnDataChunk.java:70)
            at org.apache.carbondata.core.scan.result.AbstractScannedResult.freeMemory(AbstractScannedResult.java:517)
            at org.apache.carbondata.core.scan.processor.AbstractDataBlockIterator.close(AbstractDataBlockIterator.java:228)
            at org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator.close(AbstractDetailQueryResultIterator.java:306)
            at org.apache.carbondata.core.scan.executor.impl.AbstractQueryExecutor.finish(AbstractQueryExecutor.java:544)
            at org.apache.carbondata.processing.merger.CarbonCompactionExecutor.finish(CarbonCompactionExecutor.java:173)
            at org.apache.carbondata.spark.rdd.CarbonMergerRDD$$anon$1.<init>(CarbonMergerRDD.scala:242)
            at org.apache.carbondata.spark.rdd.CarbonMergerRDD.internalCompute(CarbonMergerRDD.scala:79)
            at org.apache.carbondata.spark.rdd.CarbonRDD.compute(CarbonRDD.scala:61)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
            at org.apache.spark.scheduler.Task.run(Task.scala:99)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
    ```
    
    Note: **NPE occurs in the compaction phase, sometimes in the data loading phase I tested
before, I think it's a potential problem.**



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message