Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 81EE8200CD9 for ; Wed, 19 Jul 2017 14:38:10 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 808EB1675F4; Wed, 19 Jul 2017 12:38:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A0AA31675DE for ; Wed, 19 Jul 2017 14:38:09 +0200 (CEST) Received: (qmail 96374 invoked by uid 500); 19 Jul 2017 12:38:08 -0000 Mailing-List: contact issues-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list issues@carbondata.apache.org Received: (qmail 96361 invoked by uid 99); 19 Jul 2017 12:38:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Jul 2017 12:38:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A8D4ADFF9F; Wed, 19 Jul 2017 12:38:08 +0000 (UTC) From: ravipesala To: issues@carbondata.apache.org Reply-To: issues@carbondata.apache.org References: In-Reply-To: Subject: [GitHub] carbondata pull request #1185: [WIP]Fixed Concurrent table data loading unsa... Content-Type: text/plain Message-Id: <20170719123808.A8D4ADFF9F@git1-us-west.apache.org> Date: Wed, 19 Jul 2017 12:38:08 +0000 (UTC) archived-at: Wed, 19 Jul 2017 12:38:10 -0000 Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1185#discussion_r128232969 --- Diff: core/src/main/java/org/apache/carbondata/core/memory/UnsafeSortMemoryManager.java --- @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.memory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonProperties; + +/** + * Memory manager to keep track of + * all memory for storing the sorted data + */ +public class UnsafeSortMemoryManager { + + /** + * logger + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(UnsafeSortMemoryManager.class.getName()); + + /** + * offheap is enabled + */ + private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, + CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)); + + /** + * map to keep taskid to memory blocks + */ + private static Map> taskIdToMemoryBlockMap; + + /** + * singleton instance + */ + public static final UnsafeSortMemoryManager INSTANCE; + + /** + * total memory available for sort data storage + */ + private long totalMemory; + + /** + * current memory used + */ + private long memoryUsed; + + /** + * current memory allocator + */ + private MemoryAllocator allocator; + + static { + long size; + try { + size = Long.parseLong(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB, + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT)); + } catch (Exception e) { + size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT); + LOGGER.info("Wrong memory size given, " + "so setting default value to " + size); + } + if (size < 1024) { + size = 1024; + LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, " + + "so setting default value to " + size); + } + + long takenSize = size * 1024 * 1024; + MemoryAllocator allocator; + if (offHeap) { + allocator = MemoryAllocator.UNSAFE; + } else { + long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100; + if (takenSize > maxMemory) { + takenSize = maxMemory; + } + allocator = MemoryAllocator.HEAP; + } + INSTANCE = new UnsafeSortMemoryManager(takenSize, allocator); + taskIdToMemoryBlockMap = new HashMap<>(); + } + + private UnsafeSortMemoryManager(long totalMemory, MemoryAllocator allocator) { + this.totalMemory = totalMemory; + this.allocator = allocator; + LOGGER.info("Sort Memory manager is created with size " + totalMemory + " with " + allocator); + } + + /** + * Below method will be used to check whether memory required is + * available or not + * + * @param required + * @return if memory available + */ + public synchronized boolean isMemoryAvailable(long required) { + return memoryUsed + required < totalMemory; + } + + /** + * Below method will be used to allocate dummy memory + * this will be used to allocate first and then used when u need + * + * @param size + */ + public synchronized void allocateDummyMemory(long size) { + memoryUsed += size; + } + + /** + * Get total memory available + * + * @return amount of memory available + */ + public long getUsableMemory() { + return totalMemory; + } + + public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) { + allocator.free(memoryBlock); + taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock); + memoryUsed -= memoryBlock.size(); + memoryUsed = memoryUsed < 0 ? 0 : memoryUsed; + } + + public void freeMemoryAll(long taskId) { + Set memoryBlocks = null; + synchronized (INSTANCE) { + memoryBlocks = taskIdToMemoryBlockMap.remove(taskId); + } + if (null != memoryBlocks) { + Iterator iterator = memoryBlocks.iterator(); + while (iterator.hasNext()) { + allocator.free(iterator.next()); + } + } + } + + public synchronized MemoryBlock allocateMemoryLazy(long taskId, long memoryRequested) { + MemoryBlock allocate = allocator.allocate(memoryRequested); --- End diff -- add log --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---