Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 898FB200CEC for ; Mon, 21 Aug 2017 10:11:33 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 87FC5164013; Mon, 21 Aug 2017 08:11:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A95ED163F35 for ; Mon, 21 Aug 2017 10:11:32 +0200 (CEST) Received: (qmail 73073 invoked by uid 500); 21 Aug 2017 08:11:31 -0000 Mailing-List: contact issues-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list issues@carbondata.apache.org Received: (qmail 73052 invoked by uid 99); 21 Aug 2017 08:11:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Aug 2017 08:11:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 48C48DFA43; Mon, 21 Aug 2017 08:11:31 +0000 (UTC) From: zzcclp To: issues@carbondata.apache.org Reply-To: issues@carbondata.apache.org References: In-Reply-To: Subject: [GitHub] carbondata pull request #1266: [CARBONDATA-1393] Avoid to throw NPE when exe... Content-Type: text/plain Message-Id: <20170821081131.48C48DFA43@git1-us-west.apache.org> Date: Mon, 21 Aug 2017 08:11:31 +0000 (UTC) archived-at: Mon, 21 Aug 2017 08:11:33 -0000 Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1266#discussion_r134168452 --- Diff: core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java --- @@ -105,13 +105,15 @@ private synchronized MemoryBlock allocateMemory(long taskId, long memoryRequeste } public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) { - taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock); - allocator.free(memoryBlock); - memoryUsed -= memoryBlock.size(); - memoryUsed = memoryUsed < 0 ? 0 : memoryUsed; - LOGGER.info( - "Freeing memory of size: " + memoryBlock.size() + "available memory: " + (totalMemory - - memoryUsed)); + if (taskIdToMemoryBlockMap.containsKey(taskId)) { --- End diff -- @jackylk , @ravipesala I ran the following code in my IDE: ``` spark.sql(""" | CREATE TABLE IF NOT EXISTS study_carbondata ( | stringField1 string, | stringField2 string, | stringField3 string, | intField int, | longField bigint, | int2Field int | ) | STORED BY 'carbondata' | TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1, stringField2, stringField3, longField', | 'SORT_COLUMNS'='stringField1, stringField2, stringField3, intField', | 'NO_INVERTED_INDEX'='longField', | 'TABLE_BLOCKSIZE'='8' | ) """.stripMargin) df3.write .format("carbondata") .option("tableName", "study_carbondata") .option("compress", "true") // just valid when tempCSV is true .option("tempCSV", "false") .option("single_pass", "true") .option("sort_scope", "LOCAL_SORT") //GLOBAL_SORT LOCAL_SORT .mode(SaveMode.Append) .save() df3.write .format("carbondata") .option("tableName", "study_carbondata") .option("compress", "true") // just valid when tempCSV is true .option("tempCSV", "false") .option("single_pass", "true") .option("sort_scope", "LOCAL_SORT") //GLOBAL_SORT LOCAL_SORT .mode(SaveMode.Append) .save() ``` **the configs in carbon.properties:** ``` carbon.enable.auto.load.merge=true carbon.compaction.level.threshold=2,4 carbon.major.compaction.size=1024 carbon.number.of.cores.while.loading=2 carbon.number.of.cores.while.compacting=2 enable.unsafe.columnpage=true enable.unsafe.in.query.processing=true enable.unsafe.sort=true carbon.global.sort.rdd.storage.level=MEMORY_AND_DISK_SER ``` I added some log in the method 'UnsafeMemoryManager.freeMemory' and method 'UnsafeMemoryManager.freeMemoryAll', and found that: 2017-08-21 15:42:25,240 - [Executor task launch worker for task 12][partitionID:carbondata;queryID:21481913656922] **contains Key: true 21481895071366** 2017-08-21 15:42:25,241 - [Executor task launch worker for task 12][partitionID:carbondata;queryID:21481913656922] **remove Key: 21481895071366** 2017-08-21 15:42:25,245 - [Executor task launch worker for task 12][partitionID:carbondata;queryID:21481913656922] **contains Key: false 21481895071366** **taskId '21481895071366' had been removed before getting from 'taskIdToMemoryBlockMap', so the NPE was occurred:** ``` Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 12, localhost, executor driver): java.lang.NullPointerException at org.apache.carbondata.core.memory.UnsafeMemoryManager.freeMemory(UnsafeMemoryManager.java:109) at org.apache.carbondata.core.datastore.page.UnsafeFixLengthColumnPage.freeMemory(UnsafeFixLengthColumnPage.java:317) at org.apache.carbondata.core.datastore.page.LazyColumnPage.freeMemory(LazyColumnPage.java:202) at org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk.freeMemory(MeasureColumnDataChunk.java:70) at org.apache.carbondata.core.scan.result.AbstractScannedResult.freeMemory(AbstractScannedResult.java:517) at org.apache.carbondata.core.scan.processor.AbstractDataBlockIterator.close(AbstractDataBlockIterator.java:228) at org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator.close(AbstractDetailQueryResultIterator.java:306) at org.apache.carbondata.core.scan.executor.impl.AbstractQueryExecutor.finish(AbstractQueryExecutor.java:544) at org.apache.carbondata.processing.merger.CarbonCompactionExecutor.finish(CarbonCompactionExecutor.java:173) at org.apache.carbondata.spark.rdd.CarbonMergerRDD$$anon$1.(CarbonMergerRDD.scala:242) at org.apache.carbondata.spark.rdd.CarbonMergerRDD.internalCompute(CarbonMergerRDD.scala:79) at org.apache.carbondata.spark.rdd.CarbonRDD.compute(CarbonRDD.scala:61) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 12, localhost, executor driver): java.lang.NullPointerException at org.apache.carbondata.core.memory.UnsafeMemoryManager.freeMemory(UnsafeMemoryManager.java:109) at org.apache.carbondata.core.datastore.page.UnsafeFixLengthColumnPage.freeMemory(UnsafeFixLengthColumnPage.java:317) at org.apache.carbondata.core.datastore.page.LazyColumnPage.freeMemory(LazyColumnPage.java:202) at org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk.freeMemory(MeasureColumnDataChunk.java:70) at org.apache.carbondata.core.scan.result.AbstractScannedResult.freeMemory(AbstractScannedResult.java:517) at org.apache.carbondata.core.scan.processor.AbstractDataBlockIterator.close(AbstractDataBlockIterator.java:228) at org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator.close(AbstractDetailQueryResultIterator.java:306) at org.apache.carbondata.core.scan.executor.impl.AbstractQueryExecutor.finish(AbstractQueryExecutor.java:544) at org.apache.carbondata.processing.merger.CarbonCompactionExecutor.finish(CarbonCompactionExecutor.java:173) at org.apache.carbondata.spark.rdd.CarbonMergerRDD$$anon$1.(CarbonMergerRDD.scala:242) at org.apache.carbondata.spark.rdd.CarbonMergerRDD.internalCompute(CarbonMergerRDD.scala:79) at org.apache.carbondata.spark.rdd.CarbonRDD.compute(CarbonRDD.scala:61) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` Note: **NPE occurs in the compaction phase, sometimes in the data loading phase I tested before, I think it's a potential problem.** --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---