carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [3/4] incubator-carbondata git commit: [CARBONDATA-484] Implement LRU cache for B-Tree + fixed impacted test cases
Date Tue, 03 Jan 2017 17:53:22 GMT
[CARBONDATA-484] Implement LRU cache for B-Tree + fixed impacted test cases


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/b6ab4ef6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/b6ab4ef6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/b6ab4ef6

Branch: refs/heads/master
Commit: b6ab4ef654ee148a8b9cddf89554e1f46a5759f7
Parents: cb21480
Author: mohammadshahidkhan <mohdshahidkhan1987@gmail.com>
Authored: Wed Oct 5 10:15:09 2016 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Tue Jan 3 23:11:09 2017 +0530

----------------------------------------------------------------------
 .../carbondata/core/cache/CacheProvider.java    |  69 ++-
 .../apache/carbondata/core/cache/CacheType.java |  16 +
 .../carbondata/core/cache/CarbonLRUCache.java   |  81 ++--
 .../datastore/AbstractBlockIndexStoreCache.java | 117 +++++
 .../core/carbon/datastore/BlockIndexStore.java  | 456 +++++++++----------
 .../carbon/datastore/SegmentTaskIndexStore.java | 344 +++++++-------
 .../datastore/TableSegmentUniqueIdentifier.java | 136 ++++++
 .../carbon/datastore/block/AbstractIndex.java   |  70 ++-
 .../core/carbon/datastore/block/BlockInfo.java  |  27 ++
 .../block/SegmentTaskIndexWrapper.java          | 121 +++++
 .../block/TableBlockUniqueIdentifier.java       |  72 +++
 .../core/carbon/path/CarbonTablePath.java       |  15 +-
 .../core/constants/CarbonCommonConstants.java   |  17 +-
 .../apache/carbondata/core/util/CarbonUtil.java |  65 +++
 .../executor/impl/AbstractQueryExecutor.java    |  33 +-
 .../AbstractDetailQueryResultIterator.java      |   1 +
 .../core/cache/CacheProviderTest.java           |   2 +-
 .../dictionary/ForwardDictionaryCacheTest.java  |   2 +-
 .../dictionary/ReverseDictionaryCacheTest.java  |   2 +-
 .../datastore/SegmentTaskIndexStoreTest.java    |  37 +-
 .../carbon/datastore/block/BlockInfoTest.java   |  16 +-
 .../carbondata/core/util/CarbonUtilTest.java    |   2 +-
 .../core/util/DataFileFooterConverterTest.java  |   4 +-
 .../apache/carbondata/hadoop/CacheClient.java   |  96 ++++
 .../carbondata/hadoop/CarbonInputFormat.java    |  50 +-
 .../internal/index/impl/InMemoryBTreeIndex.java |  45 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   3 -
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   3 +
 .../MajorCompactionIgnoreInMinorTest.scala      |  21 +-
 .../carbon/datastore/BlockIndexStoreTest.java   |  78 +++-
 30 files changed, 1443 insertions(+), 558 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
index fa505bf..7d92ca2 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
@@ -22,11 +22,20 @@ package org.apache.carbondata.core.cache;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.cache.dictionary.ForwardDictionaryCache;
 import org.apache.carbondata.core.cache.dictionary.ReverseDictionaryCache;
+import org.apache.carbondata.core.carbon.datastore.BlockIndexStore;
+import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore;
+import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
 
 /**
  * Cache provider class which will create a cache based on given type
@@ -45,15 +54,19 @@ public class CacheProvider {
       new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
   /**
-   * a map that will hold the mapping of cache type to LRU cache instance
+   * object lock instance to be used in synchronization block
    */
-  private Map<CacheType, CarbonLRUCache> cacheTypeToLRUCacheMap =
-      new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+  private final Object lock = new Object();
+  /**
+   * LRU cache instance
+   */
+  private CarbonLRUCache carbonLRUCache;
 
   /**
-   * object lock instance to be used in synchronization block
+   * instance for CacheProvider LOGGER
    */
-  private final Object lock = new Object();
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CacheProvider.class.getName());
 
   /**
    * private constructor to follow singleton design pattern for this class
@@ -85,7 +98,7 @@ public class CacheProvider {
     if (!dictionaryCacheAlreadyExists(cacheType)) {
       synchronized (lock) {
         if (!dictionaryCacheAlreadyExists(cacheType)) {
-          if (null == cacheTypeToLRUCacheMap.get(cacheType)) {
+          if (null == carbonLRUCache) {
             createLRULevelCacheInstance(cacheType);
           }
           createDictionaryCacheForGivenType(cacheType, carbonStorePath);
@@ -106,11 +119,18 @@ public class CacheProvider {
     if (cacheType.equals(CacheType.REVERSE_DICTIONARY)) {
       cacheObject =
           new ReverseDictionaryCache<DictionaryColumnUniqueIdentifier, Dictionary>(carbonStorePath,
-              cacheTypeToLRUCacheMap.get(cacheType));
+              carbonLRUCache);
     } else if (cacheType.equals(CacheType.FORWARD_DICTIONARY)) {
       cacheObject =
           new ForwardDictionaryCache<DictionaryColumnUniqueIdentifier, Dictionary>(carbonStorePath,
-              cacheTypeToLRUCacheMap.get(cacheType));
+              carbonLRUCache);
+    } else if (cacheType.equals(cacheType.EXECUTOR_BTREE)) {
+      cacheObject = new BlockIndexStore<TableBlockUniqueIdentifier, AbstractIndex>(carbonStorePath,
+          carbonLRUCache);
+    } else if (cacheType.equals(cacheType.DRIVER_BTREE)) {
+      cacheObject =
+          new SegmentTaskIndexStore<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>(
+              carbonStorePath, carbonLRUCache);
     }
     cacheTypeToCacheMap.put(cacheType, cacheObject);
   }
@@ -121,15 +141,25 @@ public class CacheProvider {
    * @param cacheType
    */
   private void createLRULevelCacheInstance(CacheType cacheType) {
-    CarbonLRUCache carbonLRUCache = null;
-    // if cache type is dictionary cache, then same lru cache instance has to be shared
-    // between forward and reverse cache
-    if (cacheType.equals(CacheType.REVERSE_DICTIONARY) || cacheType
-        .equals(CacheType.FORWARD_DICTIONARY)) {
-      carbonLRUCache = new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_LEVEL_CACHE_SIZE,
-          CarbonCommonConstants.CARBON_MAX_LEVEL_CACHE_SIZE_DEFAULT);
-      cacheTypeToLRUCacheMap.put(CacheType.REVERSE_DICTIONARY, carbonLRUCache);
-      cacheTypeToLRUCacheMap.put(CacheType.FORWARD_DICTIONARY, carbonLRUCache);
+    boolean isDriver = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "false"));
+    if (isDriver) {
+      carbonLRUCache = new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE,
+          CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT);
+    } else {
+      // if executor cache size is not configured then driver cache conf will be used
+      String executorCacheSize = CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE);
+      if (null != executorCacheSize) {
+        carbonLRUCache =
+            new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE,
+                CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT);
+      } else {
+        LOGGER.info(
+            "Executor LRU cache size not configured. Initializing with driver LRU cache size.");
+        carbonLRUCache = new CarbonLRUCache(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE,
+            CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT);
+      }
     }
   }
 
@@ -148,7 +178,10 @@ public class CacheProvider {
    * Below method will be used to clear the cache
    */
   public void dropAllCache() {
-    cacheTypeToLRUCacheMap.clear();
+    if(null != carbonLRUCache) {
+      carbonLRUCache.clear();
+      carbonLRUCache= null;
+    }
     cacheTypeToCacheMap.clear();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
index ea511e9..5cc0282 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
@@ -21,6 +21,10 @@ package org.apache.carbondata.core.cache;
 
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier;
 
 /**
  * class which defines different cache types. cache type can be dictionary cache for
@@ -42,6 +46,18 @@ public class CacheType<K, V> {
       new CacheType("reverse_dictionary");
 
   /**
+   * Executor BTree cache which maintains size of BTree metadata
+   */
+  public static final CacheType<TableBlockUniqueIdentifier, AbstractIndex> EXECUTOR_BTREE =
+      new CacheType("executor_btree");
+
+  /**
+   * Executor BTree cache which maintains size of BTree metadata
+   */
+  public static final CacheType<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>
+      DRIVER_BTREE = new CacheType("driver_btree");
+
+  /**
    * cacheName which is unique name for a cache
    */
   private String cacheName;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
index 4ba38e4..ca77c9c 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CarbonLRUCache.java
@@ -70,11 +70,11 @@ public final class CarbonLRUCache {
     }
     initCache();
     if (lruCacheMemorySize > 0) {
-      LOGGER.info("Configured level cahce size is " + lruCacheMemorySize + " MB");
+      LOGGER.info("Configured LRU cache size is " + lruCacheMemorySize + " MB");
       // convert in bytes
       lruCacheMemorySize = lruCacheMemorySize * BYTE_CONVERSION_CONSTANT;
     } else {
-      LOGGER.info("Column cache size not configured. Therefore default behavior will be "
+      LOGGER.info("LRU cache size not configured. Therefore default behavior will be "
               + "considered and no LRU based eviction of columns will be done");
     }
   }
@@ -159,8 +159,10 @@ public final class CarbonLRUCache {
     if (null != cacheable) {
       currentSize = currentSize - cacheable.getMemorySize();
     }
-    lruCacheMap.remove(key);
-    LOGGER.info("Removed level entry from InMemory level lru cache :: " + key);
+    Cacheable remove = lruCacheMap.remove(key);
+    if(null != remove) {
+      LOGGER.info("Removed entry from InMemory lru cache :: " + key);
+    }
   }
 
   /**
@@ -171,25 +173,54 @@ public final class CarbonLRUCache {
    * @param cacheInfo
    */
   public boolean put(String columnIdentifier, Cacheable cacheInfo, long requiredSize) {
+    LOGGER.debug("Required size for entry " + columnIdentifier + " :: " + requiredSize
+        + " Current cache size :: " + currentSize);
     boolean columnKeyAddedSuccessfully = false;
-    if (freeMemorySizeForAddingCache(requiredSize)) {
+    if (isLRUCacheSizeConfigured()) {
       synchronized (lruCacheMap) {
-        currentSize = currentSize + requiredSize;
-        if (null == lruCacheMap.get(columnIdentifier)) {
-          lruCacheMap.put(columnIdentifier, cacheInfo);
+        if (freeMemorySizeForAddingCache(requiredSize)) {
+          currentSize = currentSize + requiredSize;
+          addEntryToLRUCacheMap(columnIdentifier, cacheInfo);
+          columnKeyAddedSuccessfully = true;
+        } else {
+          LOGGER.error(
+              "Size not available. Entry cannot be added to lru cache :: " + columnIdentifier
+                  + " .Required Size = " + requiredSize + " Size available " + (lruCacheMemorySize
+                  - currentSize));
         }
-        columnKeyAddedSuccessfully = true;
       }
-      LOGGER.debug("Added level entry to InMemory level lru cache :: " + columnIdentifier);
     } else {
-      LOGGER.error("Size not available. Column cannot be added to level lru cache :: "
-          + columnIdentifier + " .Required Size = " + requiredSize + " Size available "
-          + (lruCacheMemorySize - currentSize));
+      synchronized (lruCacheMap) {
+        addEntryToLRUCacheMap(columnIdentifier, cacheInfo);
+      }
+      columnKeyAddedSuccessfully = true;
     }
     return columnKeyAddedSuccessfully;
   }
 
   /**
+   * The method will add the cache entry to LRU cache map
+   *
+   * @param columnIdentifier
+   * @param cacheInfo
+   */
+  private void addEntryToLRUCacheMap(String columnIdentifier, Cacheable cacheInfo) {
+    if (null == lruCacheMap.get(columnIdentifier)) {
+      lruCacheMap.put(columnIdentifier, cacheInfo);
+    }
+    LOGGER.debug("Added entry to InMemory lru cache :: " + columnIdentifier);
+  }
+
+  /**
+   * this will check whether the LRU cache size is configured
+   *
+   * @return <Boolean> value
+   */
+  private boolean isLRUCacheSizeConfigured() {
+    return lruCacheMemorySize > 0;
+  }
+
+  /**
    * This method will check a required column can be loaded into memory or not. If required
    * this method will call for eviction of existing data from memory
    *
@@ -198,24 +229,18 @@ public final class CarbonLRUCache {
    */
   private boolean freeMemorySizeForAddingCache(long requiredSize) {
     boolean memoryAvailable = false;
-    if (lruCacheMemorySize > 0) {
+    if (isSizeAvailableToLoadColumnDictionary(requiredSize)) {
+      memoryAvailable = true;
+    } else {
+      // get the keys that can be removed from memory
+      List<String> keysToBeRemoved = getKeysToBeRemoved(requiredSize);
+      for (String cacheKey : keysToBeRemoved) {
+        removeKey(cacheKey);
+      }
+      // after removing the keys check again if required size is available
       if (isSizeAvailableToLoadColumnDictionary(requiredSize)) {
         memoryAvailable = true;
-      } else {
-        synchronized (lruCacheMap) {
-          // get the keys that can be removed from memory
-          List<String> keysToBeRemoved = getKeysToBeRemoved(requiredSize);
-          for (String cacheKey : keysToBeRemoved) {
-            removeKey(cacheKey);
-          }
-          // after removing the keys check again if required size is available
-          if (isSizeAvailableToLoadColumnDictionary(requiredSize)) {
-            memoryAvailable = true;
-          }
-        }
       }
-    } else {
-      memoryAvailable = true;
     }
     return memoryAvailable;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/main/java/org/apache/carbondata/core/carbon/datastore/AbstractBlockIndexStoreCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/AbstractBlockIndexStoreCache.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/AbstractBlockIndexStoreCache.java
new file mode 100644
index 0000000..c700a06
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/AbstractBlockIndexStoreCache.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.carbon.datastore;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.carbon.datastore.block.BlockInfo;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier;
+import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.CarbonUtilException;
+
+/**
+ * This class validate and load the B-Tree in the executor lru cache
+ * @param <K> cache key
+ * @param <V> Block Meta data details
+ */
+public abstract class AbstractBlockIndexStoreCache<K, V>
+    implements Cache<TableBlockUniqueIdentifier, AbstractIndex> {
+  /**
+   * carbon store path
+   */
+  protected String carbonStorePath;
+  /**
+   * CarbonLRU cache
+   */
+  protected CarbonLRUCache lruCache;
+
+  /**
+   * table segment id vs blockInfo list
+   */
+  protected  Map<String, List<BlockInfo>> segmentIdToBlockListMap;
+
+
+  /**
+   * map of block info to lock object map, while loading the btree this will be filled
+   * and removed after loading the tree for that particular block info, this will be useful
+   * while loading the tree concurrently so only block level lock will be applied another
+   * block can be loaded concurrently
+   */
+  protected Map<BlockInfo, Object> blockInfoLock;
+
+  /**
+   * The object will hold the segment ID lock so that at a time only 1 block that belongs to same
+   * segment & table can create the list for holding the block info
+   */
+  protected Map<String, Object> segmentIDLock;
+
+  public AbstractBlockIndexStoreCache(String carbonStorePath, CarbonLRUCache lruCache) {
+    this.carbonStorePath = carbonStorePath;
+    this.lruCache = lruCache;
+    blockInfoLock = new ConcurrentHashMap<BlockInfo, Object>();
+    segmentIDLock= new ConcurrentHashMap<String, Object>();
+    segmentIdToBlockListMap = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * This method will get the value for the given key. If value does not exist
+   * for the given key, it will check and load the value.
+   *
+   * @param tableBlock
+   * @param tableBlockUniqueIdentifier
+   * @param lruCacheKey
+   */
+  protected void checkAndLoadTableBlocks(AbstractIndex tableBlock,
+      TableBlockUniqueIdentifier tableBlockUniqueIdentifier, String lruCacheKey)
+      throws CarbonUtilException {
+    // calculate the required size is
+    TableBlockInfo blockInfo = tableBlockUniqueIdentifier.getTableBlockInfo();
+    long requiredMetaSize = CarbonUtil
+        .calculateMetaSize(blockInfo.getFilePath(), blockInfo.getBlockOffset(),
+            blockInfo.getBlockLength());
+    if (requiredMetaSize > 0) {
+      tableBlock.setMemorySize(requiredMetaSize);
+      tableBlock.incrementAccessCount();
+      boolean isTableBlockAddedToLruCache = lruCache.put(lruCacheKey, tableBlock, requiredMetaSize);
+      // if column is successfully added to lru cache then only load the
+      // table blocks data
+      if (isTableBlockAddedToLruCache) {
+        // load table blocks data
+        // getting the data file meta data of the block
+        DataFileFooter footer = CarbonUtil
+            .readMetadatFile(blockInfo);
+        footer.setBlockInfo(new BlockInfo(blockInfo));
+        // building the block
+        tableBlock.buildIndex(Arrays.asList(footer));
+      } else {
+        throw new CarbonUtilException(
+            "Cannot load table blocks into memory. Not enough memory available");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
index d7ba318..9b5818f 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
@@ -16,113 +16,152 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.carbondata.core.carbon.datastore;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.carbon.datastore.block.BlockIndex;
 import org.apache.carbondata.core.carbon.datastore.block.BlockInfo;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier;
 import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
-import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.CarbonUtilException;
 
 /**
- * Singleton Class to handle loading, unloading,clearing,storing of the table
- * blocks
+ * This class is used to load the B-Tree in Executor LRU Cache
  */
-public class BlockIndexStore {
-
-  /**
-   * singleton instance
-   */
-  private static final BlockIndexStore CARBONTABLEBLOCKSINSTANCE = new BlockIndexStore();
-
-  /**
-   * map to hold the table and its list of blocks
-   */
-  private Map<AbsoluteTableIdentifier, Map<BlockInfo, AbstractIndex>> tableBlocksMap;
-
-  /**
-   * map to maintain segment id to block info map, this map will be used to
-   * while removing the block from memory when segment is compacted or deleted
-   */
-  private Map<AbsoluteTableIdentifier, Map<String, List<BlockInfo>>> segmentIdToBlockListMap;
-
-  /**
-   * map of block info to lock object map, while loading the btree this will be filled
-   * and removed after loading the tree for that particular block info, this will be useful
-   * while loading the tree concurrently so only block level lock will be applied another
-   * block can be loaded concurrently
-   */
-  private Map<BlockInfo, Object> blockInfoLock;
+public class BlockIndexStore<K, V> extends AbstractBlockIndexStoreCache<K, V> {
 
   /**
-   * table and its lock object to this will be useful in case of concurrent
-   * query scenario when more than one query comes for same table and in that
-   * case it will ensure that only one query will able to load the blocks
+   * LOGGER instance
    */
-  private Map<AbsoluteTableIdentifier, Object> tableLockMap;
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(BlockIndexStore.class.getName());
+  public BlockIndexStore(String carbonStorePath, CarbonLRUCache lruCache) {
+    super(carbonStorePath, lruCache);
+  }
 
   /**
-   * block info to future task mapping
-   * useful when blocklet distribution is enabled and
-   * same block is loaded by multiple thread
+   * The method loads the block meta in B-tree lru cache and returns the block meta.
+   *
+   * @param tableBlockUniqueIdentifier Uniquely identifies the block
+   * @return returns the blocks B-Tree meta
+   * @throws CarbonUtilException
    */
-  private Map<BlockInfo, Future<AbstractIndex>> mapOfBlockInfoToFuture;
+  @Override public AbstractIndex get(TableBlockUniqueIdentifier tableBlockUniqueIdentifier)
+      throws CarbonUtilException {
+    TableBlockInfo tableBlockInfo = tableBlockUniqueIdentifier.getTableBlockInfo();
+    BlockInfo blockInfo = new BlockInfo(tableBlockInfo);
+    String lruCacheKey =
+        getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo);
+    AbstractIndex tableBlock = (AbstractIndex) lruCache.get(lruCacheKey);
 
-  private BlockIndexStore() {
-    tableBlocksMap = new ConcurrentHashMap<AbsoluteTableIdentifier, Map<BlockInfo, AbstractIndex>>(
-        CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    tableLockMap = new ConcurrentHashMap<AbsoluteTableIdentifier, Object>(
-        CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    blockInfoLock = new ConcurrentHashMap<BlockInfo, Object>();
-    segmentIdToBlockListMap = new ConcurrentHashMap<>();
-    mapOfBlockInfoToFuture = new ConcurrentHashMap<>();
+    // if block is not loaded
+    if (null == tableBlock) {
+      // check any lock object is present in
+      // block info lock map
+      Object blockInfoLockObject = blockInfoLock.get(blockInfo);
+      // if lock object is not present then acquire
+      // the lock in block info lock and add a lock object in the map for
+      // particular block info, added double checking mechanism to add the lock
+      // object so in case of concurrent query we for same block info only one lock
+      // object will be added
+      if (null == blockInfoLockObject) {
+        synchronized (blockInfoLock) {
+          // again checking the block info lock, to check whether lock object is present
+          // or not if now also not present then add a lock object
+          blockInfoLockObject = blockInfoLock.get(blockInfo);
+          if (null == blockInfoLockObject) {
+            blockInfoLockObject = new Object();
+            blockInfoLock.put(blockInfo, blockInfoLockObject);
+          }
+        }
+      }
+      //acquire the lock for particular block info
+      synchronized (blockInfoLockObject) {
+        // check again whether block is present or not to avoid the
+        // same block is loaded
+        //more than once in case of concurrent query
+        tableBlock = (AbstractIndex) lruCache.get(
+            getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo));
+        // if still block is not present then load the block
+        if (null == tableBlock) {
+          tableBlock = loadBlock(tableBlockUniqueIdentifier);
+          fillSegmentIdToBlockListMap(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(),
+              blockInfo);
+        }
+      }
+    } else {
+      tableBlock.incrementAccessCount();
+    }
+    return tableBlock;
   }
 
   /**
-   * Return the instance of this class
-   *
-   * @return singleton instance
+   * @param absoluteTableIdentifier
+   * @param blockInfo
    */
-  public static BlockIndexStore getInstance() {
-    return CARBONTABLEBLOCKSINSTANCE;
+  private void fillSegmentIdToBlockListMap(AbsoluteTableIdentifier absoluteTableIdentifier,
+      BlockInfo blockInfo) {
+    TableSegmentUniqueIdentifier segmentIdentifier =
+        new TableSegmentUniqueIdentifier(absoluteTableIdentifier,
+            blockInfo.getTableBlockInfo().getSegmentId());
+    String uniqueTableSegmentIdentifier = segmentIdentifier.getUniqueTableSegmentIdentifier();
+    List<BlockInfo> blockInfos =
+        segmentIdToBlockListMap.get(uniqueTableSegmentIdentifier);
+    if (null == blockInfos) {
+      Object segmentLockObject = segmentIDLock.get(uniqueTableSegmentIdentifier);
+      if (null == segmentLockObject) {
+        synchronized (segmentIDLock) {
+          segmentLockObject = segmentIDLock.get(uniqueTableSegmentIdentifier);
+          if (null == segmentLockObject) {
+            segmentLockObject = new Object();
+            segmentIDLock.put(uniqueTableSegmentIdentifier, segmentLockObject);
+          }
+        }
+      }
+      synchronized (segmentLockObject) {
+        blockInfos =
+            segmentIdToBlockListMap.get(segmentIdentifier.getUniqueTableSegmentIdentifier());
+        if (null == blockInfos) {
+          blockInfos = new CopyOnWriteArrayList<>();
+          segmentIdToBlockListMap.put(uniqueTableSegmentIdentifier, blockInfos);
+        }
+        blockInfos.add(blockInfo);
+      }
+    } else {
+      blockInfos.add(blockInfo);
+    }
   }
 
   /**
-   * below method will be used to load the block which are not loaded and to
-   * get the loaded blocks if all the blocks which are passed is loaded then
-   * it will not load , else it will load.
+   * The method takes list of tableblocks as input and load them in btree lru cache
+   * and returns the list of data blocks meta
    *
-   * @param tableBlocksInfos        list of blocks to be loaded
-   * @param absoluteTableIdentifier absolute Table Identifier to identify the table
-   * @throws IndexBuilderException
+   * @param tableBlocksInfos List of unique table blocks
+   * @return List<AbstractIndex>
+   * @throws CarbonUtilException
    */
-  public List<AbstractIndex> loadAndGetBlocks(List<TableBlockInfo> tableBlocksInfos,
-      AbsoluteTableIdentifier absoluteTableIdentifier) throws IndexBuilderException {
+  @Override public List<AbstractIndex> getAll(List<TableBlockUniqueIdentifier> tableBlocksInfos)
+      throws CarbonUtilException {
     AbstractIndex[] loadedBlock = new AbstractIndex[tableBlocksInfos.size()];
-    addTableLockObject(absoluteTableIdentifier);
-
-    // get the instance
-    Object lockObject = tableLockMap.get(absoluteTableIdentifier);
-    Map<BlockInfo, AbstractIndex> tableBlockMapTemp = null;
     int numberOfCores = 1;
     try {
       numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
@@ -132,110 +171,59 @@ public class BlockIndexStore {
       numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
     }
     ExecutorService executor = Executors.newFixedThreadPool(numberOfCores);
-    // Acquire the lock to ensure only one query is loading the table blocks
-    // if same block is assigned to both the queries
-    List<BlockInfo> blockInfosNeedToLoad = null;
-    synchronized (lockObject) {
-      tableBlockMapTemp = tableBlocksMap.get(absoluteTableIdentifier);
-      // if it is loading for first time
-      if (null == tableBlockMapTemp) {
-        tableBlockMapTemp = new ConcurrentHashMap<BlockInfo, AbstractIndex>();
-        tableBlocksMap.put(absoluteTableIdentifier, tableBlockMapTemp);
-      }
-      blockInfosNeedToLoad = fillSegmentIdToTableInfoMap(tableBlocksInfos, absoluteTableIdentifier);
-    }
-    AbstractIndex tableBlock = null;
-    int counter = -1;
-    for (BlockInfo blockInfo : blockInfosNeedToLoad) {
-      counter++;
-      // if table block is already loaded then do not load
-      // that block
-      tableBlock = tableBlockMapTemp.get(blockInfo);
-      // if block is not loaded
-      if (null == tableBlock) {
-        // check any lock object is present in
-        // block info lock map
-        Object blockInfoLockObject = blockInfoLock.get(blockInfo);
-        // if lock object is not present then acquire
-        // the lock in block info lock and add a lock object in the map for
-        // particular block info, added double checking mechanism to add the lock
-        // object so in case of concurrent query we for same block info only one lock
-        // object will be added
-        if (null == blockInfoLockObject) {
-          synchronized (blockInfoLock) {
-            // again checking the block info lock, to check whether lock object is present
-            // or not if now also not present then add a lock object
-            blockInfoLockObject = blockInfoLock.get(blockInfo);
-            if (null == blockInfoLockObject) {
-              blockInfoLockObject = new Object();
-              blockInfoLock.put(blockInfo, blockInfoLockObject);
-            }
-          }
-        }
-        //acquire the lock for particular block info
-        synchronized (blockInfoLockObject) {
-          // check again whether block is present or not to avoid the
-          // same block is loaded
-          //more than once in case of concurrent query
-          tableBlock = tableBlockMapTemp.get(blockInfo);
-          // if still block is not present then load the block
-          if (null == tableBlock) {
-            if (null == mapOfBlockInfoToFuture.get(blockInfo)) {
-              mapOfBlockInfoToFuture.put(blockInfo, executor
-                  .submit(new BlockLoaderThread(blockInfo, tableBlockMapTemp)));
-            }
-          } else {
-            loadedBlock[counter] = tableBlock;
-          }
-        }
-      } else {
-        // if blocks is already loaded then directly set the block at particular position
-        //so block will be present in sorted order
-        loadedBlock[counter] = tableBlock;
-      }
+    List<Future<AbstractIndex>> blocksList = new ArrayList<Future<AbstractIndex>>();
+    for (TableBlockUniqueIdentifier tableBlockUniqueIdentifier : tableBlocksInfos) {
+      blocksList.add(executor.submit(new BlockLoaderThread(tableBlockUniqueIdentifier)));
     }
     // shutdown the executor gracefully and wait until all the task is finished
     executor.shutdown();
     try {
       executor.awaitTermination(1, TimeUnit.HOURS);
     } catch (InterruptedException e) {
-      throw new IndexBuilderException(e);
+      IndexBuilderException indexBuilderException = new IndexBuilderException(e);
+      throw new CarbonUtilException(indexBuilderException.getMessage(), indexBuilderException);
     }
     // fill the block which were not loaded before to loaded blocks array
-    fillLoadedBlocks(loadedBlock, blockInfosNeedToLoad);
+    fillLoadedBlocks(loadedBlock, blocksList);
     return Arrays.asList(loadedBlock);
   }
 
+  private String getLruCacheKey(AbsoluteTableIdentifier absoluteTableIdentifier,
+      BlockInfo blockInfo) {
+    CarbonTableIdentifier carbonTableIdentifier =
+        absoluteTableIdentifier.getCarbonTableIdentifier();
+    return carbonTableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR
+        + carbonTableIdentifier.getTableName() + CarbonCommonConstants.UNDERSCORE
+        + carbonTableIdentifier.getTableId() + CarbonCommonConstants.FILE_SEPARATOR + blockInfo
+        .getBlockUniqueName();
+  }
+
   /**
-   * Below method will be used to fill segment id to its block mapping map.
-   * it will group all the table block info based on segment id and it will fill
+   * method returns the B-Tree meta
    *
-   * @param tableBlockInfos         table block infos
-   * @param absoluteTableIdentifier absolute table identifier
+   * @param tableBlockUniqueIdentifier Unique table block info
+   * @return
    */
-  private List<BlockInfo> fillSegmentIdToTableInfoMap(List<TableBlockInfo> tableBlockInfos,
-      AbsoluteTableIdentifier absoluteTableIdentifier) {
-    Map<String, List<BlockInfo>> map = segmentIdToBlockListMap.get(absoluteTableIdentifier);
-    if (null == map) {
-      map = new ConcurrentHashMap<String, List<BlockInfo>>();
-      segmentIdToBlockListMap.put(absoluteTableIdentifier, map);
+  @Override public AbstractIndex getIfPresent(
+      TableBlockUniqueIdentifier tableBlockUniqueIdentifier) {
+    BlockInfo blockInfo = new BlockInfo(tableBlockUniqueIdentifier.getTableBlockInfo());
+    BlockIndex cacheable = (BlockIndex) lruCache
+        .get(getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo));
+    if (null != cacheable) {
+      cacheable.incrementAccessCount();
     }
-    BlockInfo temp = null;
-    List<BlockInfo> blockInfosNeedToLoad = new ArrayList<>();
+    return cacheable;
+  }
 
-    for (TableBlockInfo info : tableBlockInfos) {
-      List<BlockInfo> tempTableBlockInfos = map.get(info.getSegmentId());
-      if (null == tempTableBlockInfos) {
-        tempTableBlockInfos = new ArrayList<>();
-        map.put(info.getSegmentId(), tempTableBlockInfos);
-      }
-      temp = new BlockInfo(info);
-      if (!tempTableBlockInfos.contains(temp)) {
-        tempTableBlockInfos.add(temp);
-      }
-      blockInfosNeedToLoad.add(temp);
-    }
-    return blockInfosNeedToLoad;
+  /**
+   * the method removes the entry from cache.
+   *
+   * @param tableBlockUniqueIdentifier
+   */
+  @Override public void invalidate(TableBlockUniqueIdentifier tableBlockUniqueIdentifier) {
+    BlockInfo blockInfo = new BlockInfo(tableBlockUniqueIdentifier.getTableBlockInfo());
+    lruCache
+        .remove(getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo));
   }
 
   /**
@@ -244,124 +232,96 @@ public class BlockIndexStore {
    *
    * @param loadedBlockArray array of blocks which will be filled
    * @param blocksList       blocks loaded in thread
-   * @throws IndexBuilderException in case of any failure
+   * @throws CarbonUtilException in case of any failure
    */
-  private void fillLoadedBlocks(AbstractIndex[] loadedBlockArray, List<BlockInfo> blockInfos)
-      throws IndexBuilderException {
+  private void fillLoadedBlocks(AbstractIndex[] loadedBlockArray,
+      List<Future<AbstractIndex>> blocksList) throws CarbonUtilException {
+    int blockCounter = 0;
+    boolean exceptionOccurred = false;
+    Throwable exceptionRef = null;
     for (int i = 0; i < loadedBlockArray.length; i++) {
-      if (null == loadedBlockArray[i]) {
-        try {
-          loadedBlockArray[i] = mapOfBlockInfoToFuture.get(blockInfos.get(i)).get();
-        } catch (InterruptedException | ExecutionException e) {
-          throw new IndexBuilderException(e);
-        }
+      try {
+        loadedBlockArray[i] = blocksList.get(blockCounter++).get();
+      } catch (Throwable e) {
+        exceptionOccurred = true;
+        exceptionRef = e;
       }
-
+    }
+    if (exceptionOccurred) {
+      LOGGER.error("Block B-Tree loading failed. Clearing the access count of the loaded blocks.");
+      // in case of any failure clear the access count for the valid loaded blocks
+      clearAccessCountForLoadedBlocks(loadedBlockArray);
+      throw new CarbonUtilException("Block B-tree loading failed", exceptionRef);
     }
   }
 
-  private AbstractIndex loadBlock(Map<BlockInfo, AbstractIndex> tableBlockMapTemp,
-      BlockInfo blockInfo) throws CarbonUtilException {
-    AbstractIndex tableBlock;
-    DataFileFooter footer;
-    // getting the data file meta data of the block
-    footer = CarbonUtil.readMetadatFile(blockInfo.getTableBlockInfo());
-    tableBlock = new BlockIndex();
-    footer.setBlockInfo(blockInfo);
-    // building the block
-    tableBlock.buildIndex(Arrays.asList(footer));
-    tableBlockMapTemp.put(blockInfo, tableBlock);
-    // finally remove the lock object from block info lock as once block is loaded
-    // it will not come inside this if condition
-    blockInfoLock.remove(blockInfo);
-    return tableBlock;
+  /**
+   * This method will clear the access count for the loaded blocks
+   *
+   * @param loadedBlockArray
+   */
+  private void clearAccessCountForLoadedBlocks(AbstractIndex[] loadedBlockArray) {
+    for (int i = 0; i < loadedBlockArray.length; i++) {
+      if (null != loadedBlockArray[i]) {
+        loadedBlockArray[i].clear();
+      }
+    }
   }
 
   /**
-   * Method to add table level lock if lock is not present for the table
-   *
-   * @param absoluteTableIdentifier
+   * Thread class which will be used to load the blocks
    */
-  private synchronized void addTableLockObject(AbsoluteTableIdentifier absoluteTableIdentifier) {
-    // add the instance to lock map if it is not present
-    if (null == tableLockMap.get(absoluteTableIdentifier)) {
-      tableLockMap.put(absoluteTableIdentifier, new Object());
+  private class BlockLoaderThread implements Callable<AbstractIndex> {
+    // table  block unique identifier
+    private TableBlockUniqueIdentifier tableBlockUniqueIdentifier;
+
+    private BlockLoaderThread(TableBlockUniqueIdentifier tableBlockUniqueIdentifier) {
+      this.tableBlockUniqueIdentifier = tableBlockUniqueIdentifier;
+    }
+
+    @Override public AbstractIndex call() throws Exception {
+      // load and return the loaded blocks
+      return get(tableBlockUniqueIdentifier);
     }
   }
 
+  private AbstractIndex loadBlock(TableBlockUniqueIdentifier tableBlockUniqueIdentifier)
+      throws CarbonUtilException {
+    AbstractIndex tableBlock = new BlockIndex();
+    BlockInfo blockInfo = new BlockInfo(tableBlockUniqueIdentifier.getTableBlockInfo());
+    String lruCacheKey =
+        getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo);
+    checkAndLoadTableBlocks(tableBlock, tableBlockUniqueIdentifier, lruCacheKey);
+    // finally remove the lock object from block info lock as once block is loaded
+    // it will not come inside this if condition
+    blockInfoLock.remove(blockInfo);
+    return tableBlock;
+  }
+
   /**
    * This will be used to remove a particular blocks useful in case of
    * deletion of some of the blocks in case of retention or may be some other
    * scenario
    *
-   * @param segmentsToBeRemoved     list of segments to be removed
+   * @param segmentIds              list of table blocks to be removed
    * @param absoluteTableIdentifier absolute table identifier
    */
-  public void removeTableBlocks(List<String> segmentsToBeRemoved,
+  public void removeTableBlocks(List<String> segmentIds,
       AbsoluteTableIdentifier absoluteTableIdentifier) {
-    // get the lock object if lock object is not present then it is not
-    // loaded at all
-    // we can return from here
-    Object lockObject = tableLockMap.get(absoluteTableIdentifier);
-    if (null == lockObject) {
-      return;
-    }
-    Map<BlockInfo, AbstractIndex> map = tableBlocksMap.get(absoluteTableIdentifier);
-    // if there is no loaded blocks then return
-    if (null == map || map.isEmpty()) {
+    if (null == segmentIds) {
       return;
     }
-    Map<String, List<BlockInfo>> segmentIdToBlockInfoMap =
-        segmentIdToBlockListMap.get(absoluteTableIdentifier);
-    if (null == segmentIdToBlockInfoMap || segmentIdToBlockInfoMap.isEmpty()) {
-      return;
-    }
-    synchronized (lockObject) {
-      for (String segmentId : segmentsToBeRemoved) {
-        List<BlockInfo> tableBlockInfoList = segmentIdToBlockInfoMap.remove(segmentId);
-        if (null == tableBlockInfoList) {
-          continue;
-        }
-        Iterator<BlockInfo> tableBlockInfoIterator = tableBlockInfoList.iterator();
-        while (tableBlockInfoIterator.hasNext()) {
-          BlockInfo info = tableBlockInfoIterator.next();
-          map.remove(info);
+    for (String segmentId : segmentIds) {
+      TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
+          new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
+      List<BlockInfo> blockInfos = segmentIdToBlockListMap
+          .remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+      if (null != blockInfos) {
+        for (BlockInfo blockInfo : blockInfos) {
+          String lruCacheKey = getLruCacheKey(absoluteTableIdentifier, blockInfo);
+          lruCache.remove(lruCacheKey);
         }
       }
     }
   }
-
-  /**
-   * remove all the details of a table this will be used in case of drop table
-   *
-   * @param absoluteTableIdentifier absolute table identifier to find the table
-   */
-  public void clear(AbsoluteTableIdentifier absoluteTableIdentifier) {
-    // removing all the details of table
-    tableLockMap.remove(absoluteTableIdentifier);
-    tableBlocksMap.remove(absoluteTableIdentifier);
-  }
-
-  /**
-   * Thread class which will be used to load the blocks
-   */
-  private class BlockLoaderThread implements Callable<AbstractIndex> {
-    /**
-     * table block info to block index map
-     */
-    private Map<BlockInfo, AbstractIndex> tableBlockMap;
-
-    // block info
-    private BlockInfo blockInfo;
-
-    private BlockLoaderThread(BlockInfo blockInfo, Map<BlockInfo, AbstractIndex> tableBlockMap) {
-      this.tableBlockMap = tableBlockMap;
-      this.blockInfo = blockInfo;
-    }
-
-    @Override public AbstractIndex call() throws Exception {
-      // load and return the loaded blocks
-      return loadBlock(tableBlockMap, blockInfo);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
index 6ab18bb..83e06e9 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
@@ -29,37 +29,35 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndex;
+import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
 import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath.DataFileUtil;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.CarbonUtilException;
 
 /**
- * Singleton Class to handle loading, unloading,clearing,storing of the table
+ * Class to handle loading, unloading,clearing,storing of the table
  * blocks
  */
-public class SegmentTaskIndexStore {
-
+public class SegmentTaskIndexStore<K, V>
+    implements Cache<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper> {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(SegmentTaskIndexStore.class.getName());
   /**
-   * singleton instance
+   * carbon store path
    */
-  private static final SegmentTaskIndexStore SEGMENTTASKINDEXSTORE = new SegmentTaskIndexStore();
-
+  protected String carbonStorePath;
   /**
-   * mapping of table identifier to map of segmentId_taskId to table segment
-   * reason of so many map as each segment can have multiple data file and
-   * each file will have its own btree
+   * CarbonLRU cache
    */
-  private Map<AbsoluteTableIdentifier,
-      Map<String, Map<TaskBucketHolder, AbstractIndex>>> tableSegmentMap;
+  protected CarbonLRUCache lruCache;
 
   /**
    * map of block info to lock object map, while loading the btree this will be filled
@@ -70,28 +68,82 @@ public class SegmentTaskIndexStore {
   private Map<String, Object> segmentLockMap;
 
   /**
-   * table and its lock object to this will be useful in case of concurrent
-   * query scenario when more than one query comes for same table and in  that
-   * case it will ensure that only one query will able to load the blocks
+   * constructor to initialize the SegmentTaskIndexStore
+   *
+   * @param carbonStorePath
+   * @param lruCache
    */
-  private Map<AbsoluteTableIdentifier, Object> tableLockMap;
-
-  private SegmentTaskIndexStore() {
-    tableSegmentMap =
-        new ConcurrentHashMap<>(
-            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    tableLockMap = new ConcurrentHashMap<AbsoluteTableIdentifier, Object>(
-        CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+  public SegmentTaskIndexStore(String carbonStorePath, CarbonLRUCache lruCache) {
+    this.carbonStorePath = carbonStorePath;
+    this.lruCache = lruCache;
     segmentLockMap = new ConcurrentHashMap<String, Object>();
   }
 
+  @Override
+  public SegmentTaskIndexWrapper get(TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier)
+      throws CarbonUtilException {
+    SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
+    try {
+      segmentTaskIndexWrapper =
+          loadAndGetTaskIdToSegmentsMap(tableSegmentUniqueIdentifier.getSegmentToTableBlocksInfos(),
+              tableSegmentUniqueIdentifier.getAbsoluteTableIdentifier(),
+              tableSegmentUniqueIdentifier);
+    } catch (IndexBuilderException e) {
+      throw new CarbonUtilException(e.getMessage(), e);
+    }
+    if (null != segmentTaskIndexWrapper) {
+      segmentTaskIndexWrapper.incrementAccessCount();
+    }
+    return segmentTaskIndexWrapper;
+  }
+
   /**
-   * Return the instance of this class
+   * returns all the segments taskid_to_Blcoks map wrapper.
    *
-   * @return singleton instance
+   * @param tableSegmentUniqueIdentifiers
+   * @return
+   * @throws CarbonUtilException
    */
-  public static SegmentTaskIndexStore getInstance() {
-    return SEGMENTTASKINDEXSTORE;
+  @Override public List<SegmentTaskIndexWrapper> getAll(
+      List<TableSegmentUniqueIdentifier> tableSegmentUniqueIdentifiers) throws CarbonUtilException {
+    List<SegmentTaskIndexWrapper> segmentTaskIndexWrappers =
+        new ArrayList<>(tableSegmentUniqueIdentifiers.size());
+    try {
+      for (TableSegmentUniqueIdentifier segmentUniqueIdentifier : tableSegmentUniqueIdentifiers) {
+        segmentTaskIndexWrappers.add(get(segmentUniqueIdentifier));
+      }
+    } catch (CarbonUtilException e) {
+      for (SegmentTaskIndexWrapper segmentTaskIndexWrapper : segmentTaskIndexWrappers) {
+        segmentTaskIndexWrapper.clear();
+      }
+      throw new CarbonUtilException("Problem in loading segment blocks.", e);
+    }
+    return segmentTaskIndexWrappers;
+  }
+
+  /**
+   * returns the SegmentTaskIndexWrapper
+   *
+   * @param tableSegmentUniqueIdentifier
+   * @return
+   */
+  @Override public SegmentTaskIndexWrapper getIfPresent(
+      TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier) {
+    SegmentTaskIndexWrapper segmentTaskIndexWrapper = (SegmentTaskIndexWrapper) lruCache
+        .get(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+    if (null != segmentTaskIndexWrapper) {
+      segmentTaskIndexWrapper.incrementAccessCount();
+    }
+    return segmentTaskIndexWrapper;
+  }
+
+  /**
+   * method invalidate the segment cache for segment
+   *
+   * @param tableSegmentUniqueIdentifier
+   */
+  @Override public void invalidate(TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier) {
+    lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
   }
 
   /**
@@ -105,148 +157,97 @@ public class SegmentTaskIndexStore {
    * @return map of taks id to segment mapping
    * @throws IndexBuilderException
    */
-  public Map<TaskBucketHolder, AbstractIndex> loadAndGetTaskIdToSegmentsMap(
+  private SegmentTaskIndexWrapper loadAndGetTaskIdToSegmentsMap(
       Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos,
-      AbsoluteTableIdentifier absoluteTableIdentifier) throws IndexBuilderException {
+      AbsoluteTableIdentifier absoluteTableIdentifier,
+      TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier)
+      throws IndexBuilderException, CarbonUtilException {
     // task id to segment map
-    Map<TaskBucketHolder, AbstractIndex> taskIdToTableSegmentMap =
-        new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    addLockObject(absoluteTableIdentifier);
-    Iterator<Entry<String, List<TableBlockInfo>>> iteratorOverSegmentBlocksInfos =
+    Iterator<Map.Entry<String, List<TableBlockInfo>>> iteratorOverSegmentBlocksInfos =
         segmentToTableBlocksInfos.entrySet().iterator();
-    Map<String, Map<TaskBucketHolder, AbstractIndex>> tableSegmentMapTemp =
-        addTableSegmentMap(absoluteTableIdentifier);
     Map<TaskBucketHolder, AbstractIndex> taskIdToSegmentIndexMap = null;
-    String segmentId = null;
-    TaskBucketHolder taskId = null;
+    SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
+    TaskBucketHolder taskBucketHolder = null;
     try {
       while (iteratorOverSegmentBlocksInfos.hasNext()) {
         // segment id to table block mapping
-        Entry<String, List<TableBlockInfo>> next = iteratorOverSegmentBlocksInfos.next();
+        iteratorOverSegmentBlocksInfos.next();
         // group task id to table block info mapping for the segment
         Map<TaskBucketHolder, List<TableBlockInfo>> taskIdToTableBlockInfoMap =
             mappedAndGetTaskIdToTableBlockInfo(segmentToTableBlocksInfos);
         // get the existing map of task id to table segment map
-        segmentId = next.getKey();
         // check if segment is already loaded, if segment is already loaded
         //no need to load the segment block
-        taskIdToSegmentIndexMap = tableSegmentMapTemp.get(segmentId);
-        if (taskIdToSegmentIndexMap == null) {
+        String lruCacheKey = tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier();
+        segmentTaskIndexWrapper = (SegmentTaskIndexWrapper) lruCache.get(lruCacheKey);
+        if (segmentTaskIndexWrapper == null) {
           // get the segment loader lock object this is to avoid
           // same segment is getting loaded multiple times
           // in case of concurrent query
-          Object segmentLoderLockObject = segmentLockMap.get(segmentId);
+          Object segmentLoderLockObject = segmentLockMap.get(lruCacheKey);
           if (null == segmentLoderLockObject) {
-            segmentLoderLockObject = addAndGetSegmentLock(segmentId);
+            segmentLoderLockObject = addAndGetSegmentLock(lruCacheKey);
           }
           // acquire lock to lod the segment
           synchronized (segmentLoderLockObject) {
-            taskIdToSegmentIndexMap = tableSegmentMapTemp.get(segmentId);
-            if (null == taskIdToSegmentIndexMap) {
-              // creating a map of task id to table segment
-              taskIdToSegmentIndexMap = new ConcurrentHashMap<TaskBucketHolder, AbstractIndex>();
-              Iterator<Entry<TaskBucketHolder, List<TableBlockInfo>>> iterator =
+            segmentTaskIndexWrapper = (SegmentTaskIndexWrapper) lruCache.get(lruCacheKey);
+            if (null == segmentTaskIndexWrapper) {
+              // creating a map of take if to table segment
+              taskIdToSegmentIndexMap = new HashMap<TaskBucketHolder, AbstractIndex>();
+              segmentTaskIndexWrapper = new SegmentTaskIndexWrapper(taskIdToSegmentIndexMap);
+              Iterator<Map.Entry<TaskBucketHolder, List<TableBlockInfo>>> iterator =
                   taskIdToTableBlockInfoMap.entrySet().iterator();
-              while (iterator.hasNext()) {
-                Entry<TaskBucketHolder, List<TableBlockInfo>> taskToBlockInfoList = iterator.next();
-                taskId = taskToBlockInfoList.getKey();
-                taskIdToSegmentIndexMap.put(taskId,
-                    loadBlocks(taskId, taskToBlockInfoList.getValue(), absoluteTableIdentifier));
+              long requiredSize =
+                  calculateRequiredSize(taskIdToTableBlockInfoMap, absoluteTableIdentifier);
+              segmentTaskIndexWrapper.setMemorySize(requiredSize);
+              boolean isAddedToLruCache =
+                  lruCache.put(lruCacheKey, segmentTaskIndexWrapper, requiredSize);
+              if (isAddedToLruCache) {
+                while (iterator.hasNext()) {
+                  Map.Entry<TaskBucketHolder, List<TableBlockInfo>> taskToBlockInfoList =
+                      iterator.next();
+                  taskBucketHolder = taskToBlockInfoList.getKey();
+                  taskIdToSegmentIndexMap.put(taskBucketHolder,
+                      loadBlocks(taskBucketHolder, taskToBlockInfoList.getValue(),
+                          absoluteTableIdentifier));
+                }
+              } else {
+                throw new IndexBuilderException(
+                    "Can not load the segment. No Enough space available.");
               }
-              tableSegmentMapTemp.put(next.getKey(), taskIdToSegmentIndexMap);
+              //tableSegmentMapTemp.put(next.getKey(), taskIdToSegmentIndexMap);
               // removing from segment lock map as once segment is loaded
               //if concurrent query is coming for same segment
               // it will wait on the lock so after this segment will be already
               //loaded so lock is not required, that is why removing the
               // the lock object as it wont be useful
-              segmentLockMap.remove(segmentId);
+              segmentLockMap.remove(lruCacheKey);
             }
           }
-          taskIdToTableSegmentMap.putAll(taskIdToSegmentIndexMap);
         }
       }
-    } catch (CarbonUtilException e) {
+    } catch (IndexBuilderException e) {
       LOGGER.error("Problem while loading the segment");
       throw new IndexBuilderException(e);
     }
-    return taskIdToTableSegmentMap;
+    return segmentTaskIndexWrapper;
   }
 
-  /**
-   * Below method will be used to get the segment level lock object
-   *
-   * @param segmentId
-   * @return lock object
-   */
-  private synchronized Object addAndGetSegmentLock(String segmentId) {
-    // get the segment lock object if it is present then return
-    // otherwise add the new lock and return
-    Object segmentLoderLockObject = segmentLockMap.get(segmentId);
-    if (null == segmentLoderLockObject) {
-      segmentLoderLockObject = new Object();
-      segmentLockMap.put(segmentId, segmentLoderLockObject);
-    }
-    return segmentLoderLockObject;
-  }
-
-  /**
-   * Below code is to add table lock map which will be used to
-   * add
-   *
-   * @param absoluteTableIdentifier
-   */
-  private synchronized void addLockObject(AbsoluteTableIdentifier absoluteTableIdentifier) {
-    // add the instance to lock map if it is not present
-    if (null == tableLockMap.get(absoluteTableIdentifier)) {
-      tableLockMap.put(absoluteTableIdentifier, new Object());
-    }
-  }
-
-  /**
-   * Below method will be used to get the table segment map
-   * if table segment is not present then it will add and return
-   *
-   * @param absoluteTableIdentifier
-   * @return table segment map
-   */
-  private Map<String, Map<TaskBucketHolder, AbstractIndex>> addTableSegmentMap(
+  private long calculateRequiredSize(
+      Map<TaskBucketHolder, List<TableBlockInfo>> taskIdToTableBlockInfoMap,
       AbsoluteTableIdentifier absoluteTableIdentifier) {
-    // get the instance of lock object
-    Object lockObject = tableLockMap.get(absoluteTableIdentifier);
-    Map<String, Map<TaskBucketHolder, AbstractIndex>> tableSegmentMapTemp =
-        tableSegmentMap.get(absoluteTableIdentifier);
-    if (null == tableSegmentMapTemp) {
-      synchronized (lockObject) {
-        // segment id to task id to table segment map
-        tableSegmentMapTemp = tableSegmentMap.get(absoluteTableIdentifier);
-        if (null == tableSegmentMapTemp) {
-          tableSegmentMapTemp = new ConcurrentHashMap<>();
-          tableSegmentMap.put(absoluteTableIdentifier, tableSegmentMapTemp);
-        }
-      }
+    Iterator<Map.Entry<TaskBucketHolder, List<TableBlockInfo>>> iterator =
+        taskIdToTableBlockInfoMap.entrySet().iterator();
+    TaskBucketHolder taskBucketHolder;
+    long driverBTreeSize = 0;
+    while (iterator.hasNext()) {
+      Map.Entry<TaskBucketHolder, List<TableBlockInfo>> taskToBlockInfoList = iterator.next();
+      taskBucketHolder = taskToBlockInfoList.getKey();
+      driverBTreeSize += CarbonUtil
+          .calculateDriverBTreeSize(taskBucketHolder.taskNo, taskBucketHolder.bucketNumber,
+              taskToBlockInfoList.getValue(), absoluteTableIdentifier);
     }
-    return tableSegmentMapTemp;
-  }
-
-  /**
-   * Below method will be used to load the blocks
-   *
-   * @param tableBlockInfoList
-   * @return loaded segment
-   * @throws CarbonUtilException
-   */
-  private AbstractIndex loadBlocks(TaskBucketHolder holder, List<TableBlockInfo> tableBlockInfoList,
-      AbsoluteTableIdentifier tableIdentifier) throws CarbonUtilException {
-    // all the block of one task id will be loaded together
-    // so creating a list which will have all the data file meta data to of one task
-    List<DataFileFooter> footerList = CarbonUtil
-        .readCarbonIndexFile(holder.taskNo, holder.bucketNumber, tableBlockInfoList,
-            tableIdentifier);
-    AbstractIndex segment = new SegmentTaskIndex();
-    // file path of only first block is passed as it all table block info path of
-    // same task id will be same
-    segment.buildIndex(footerList);
-    return segment;
+    return driverBTreeSize;
   }
 
   /**
@@ -282,60 +283,71 @@ public class SegmentTaskIndexStore {
   }
 
   /**
-   * remove all the details of a table this will be used in case of drop table
+   * Below method will be used to get the segment level lock object
+   *
+   * @param segmentId
+   * @return lock object
+   */
+  private synchronized Object addAndGetSegmentLock(String segmentId) {
+    // get the segment lock object if it is present then return
+    // otherwise add the new lock and return
+    Object segmentLoderLockObject = segmentLockMap.get(segmentId);
+    if (null == segmentLoderLockObject) {
+      segmentLoderLockObject = new Object();
+      segmentLockMap.put(segmentId, segmentLoderLockObject);
+    }
+    return segmentLoderLockObject;
+  }
+
+  /**
+   * Below method will be used to load the blocks
    *
-   * @param absoluteTableIdentifier absolute table identifier to find the table
+   * @param tableBlockInfoList
+   * @return loaded segment
+   * @throws CarbonUtilException
    */
-  public void clear(AbsoluteTableIdentifier absoluteTableIdentifier) {
-    // removing all the details of table
-    tableLockMap.remove(absoluteTableIdentifier);
-    tableSegmentMap.remove(absoluteTableIdentifier);
+  private AbstractIndex loadBlocks(TaskBucketHolder taskBucketHolder,
+      List<TableBlockInfo> tableBlockInfoList, AbsoluteTableIdentifier tableIdentifier)
+      throws CarbonUtilException {
+    // all the block of one task id will be loaded together
+    // so creating a list which will have all the data file meta data to of one task
+    List<DataFileFooter> footerList = CarbonUtil
+        .readCarbonIndexFile(taskBucketHolder.taskNo, taskBucketHolder.bucketNumber,
+            tableBlockInfoList, tableIdentifier);
+    AbstractIndex segment = new SegmentTaskIndex();
+    // file path of only first block is passed as it all table block info path of
+    // same task id will be same
+    segment.buildIndex(footerList);
+    return segment;
   }
 
   /**
-   * Below method will be used to remove the segment block based on
+   * Below method will be used to remove the segment based on
    * segment id is passed
    *
    * @param segmentToBeRemoved      segment to be removed
    * @param absoluteTableIdentifier absoluteTableIdentifier
    */
-  public void removeTableBlocks(List<String> segmentToBeRemoved,
+  public void removeSegments(List<String> segmentToBeRemoved,
       AbsoluteTableIdentifier absoluteTableIdentifier) {
-    // get the lock object if lock object is not present then it is not
-    // loaded at all
-    // we can return from here
-    Object lockObject = tableLockMap.get(absoluteTableIdentifier);
-    if (null == lockObject) {
-      return;
-    }
-    // Acquire the lock and remove only those instance which was loaded
-    Map<String, Map<TaskBucketHolder, AbstractIndex>> map =
-        tableSegmentMap.get(absoluteTableIdentifier);
-    // if there is no loaded blocks then return
-    if (null == map) {
-      return;
-    }
     for (String segmentId : segmentToBeRemoved) {
-      map.remove(segmentId);
+      TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
+          new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
+      lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
     }
   }
 
   /**
-   * Below method will be used to check if segment blocks
-   * is already loaded or not
+   * The method clears the access count of table segments
    *
-   * @param absoluteTableIdentifier
-   * @param segmentId
-   * @return is loaded then return the loaded blocks otherwise null
+   * @param tableSegmentUniqueIdentifiers
    */
-  public Map<TaskBucketHolder, AbstractIndex> getSegmentBTreeIfExists(
-      AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) {
-    Map<String, Map<TaskBucketHolder, AbstractIndex>> tableSegment =
-        tableSegmentMap.get(absoluteTableIdentifier);
-    if (null == tableSegment) {
-      return null;
+  public void clear(List<TableSegmentUniqueIdentifier> tableSegmentUniqueIdentifiers) {
+    for (TableSegmentUniqueIdentifier segmentUniqueIdentifier : tableSegmentUniqueIdentifiers) {
+      SegmentTaskIndexWrapper cacheable = (SegmentTaskIndexWrapper) lruCache
+          .get(segmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+      cacheable.clear();
     }
-    return tableSegment.get(segmentId);
   }
 
   public static class TaskBucketHolder implements Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/main/java/org/apache/carbondata/core/carbon/datastore/TableSegmentUniqueIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/TableSegmentUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/TableSegmentUniqueIdentifier.java
new file mode 100644
index 0000000..ffde93f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/TableSegmentUniqueIdentifier.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.carbon.datastore;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+/**
+ * Class holds the absoluteTableIdentifier and segmentId to uniquely identify a segment
+ */
+public class TableSegmentUniqueIdentifier {
+  /**
+   * table fully qualified identifier
+   */
+  private AbsoluteTableIdentifier absoluteTableIdentifier;
+
+  /**
+   * segment to tableBlockInfo map
+   */
+  Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos;
+
+  private String segmentId;
+
+  /**
+   * Constructor to initialize the class instance
+   * @param absoluteTableIdentifier
+   * @param segmentId
+   */
+  public TableSegmentUniqueIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier,
+      String segmentId) {
+    this.absoluteTableIdentifier = absoluteTableIdentifier;
+    this.segmentId = segmentId;
+  }
+
+  public TableSegmentUniqueIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier,
+      Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos, String segmentId) {
+    this.absoluteTableIdentifier = absoluteTableIdentifier;
+    this.segmentToTableBlocksInfos = segmentToTableBlocksInfos;
+    this.segmentId = segmentId;
+  }
+
+  /**
+   * returns AbsoluteTableIdentifier
+   * @return
+   */
+  public AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
+    return absoluteTableIdentifier;
+  }
+
+  public void setAbsoluteTableIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier) {
+    this.absoluteTableIdentifier = absoluteTableIdentifier;
+  }
+
+  /**
+   *  returns the segment to tableBlockInfo map
+   * @return
+   */
+  public Map<String, List<TableBlockInfo>> getSegmentToTableBlocksInfos() {
+    return segmentToTableBlocksInfos;
+  }
+
+  /**
+   * set the segment to tableBlockInfo map
+   * @param segmentToTableBlocksInfos
+   */
+  public void setSegmentToTableBlocksInfos(
+      Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos) {
+    this.segmentToTableBlocksInfos = segmentToTableBlocksInfos;
+  }
+
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  /**
+   * method returns the id to uniquely identify a key
+   *
+   * @return
+   */
+  public String getUniqueTableSegmentIdentifier() {
+    CarbonTableIdentifier carbonTableIdentifier =
+        absoluteTableIdentifier.getCarbonTableIdentifier();
+    return carbonTableIdentifier.getDatabaseName()
+        + CarbonCommonConstants.FILE_SEPARATOR + carbonTableIdentifier
+        .getTableId() + CarbonCommonConstants.FILE_SEPARATOR + segmentId;
+  }
+
+  /**
+   * equals method to compare two objects having same
+   * absoluteIdentifier and segmentId
+   * @param o
+   * @return
+   */
+  @Override public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TableSegmentUniqueIdentifier that = (TableSegmentUniqueIdentifier) o;
+
+    if (!absoluteTableIdentifier.equals(that.absoluteTableIdentifier)) return false;
+    return segmentId.equals(that.segmentId);
+
+  }
+
+  /**
+   * Returns hashcode for the TableSegmentIdentifier
+   * @return
+   */
+  @Override public int hashCode() {
+    int result = absoluteTableIdentifier.hashCode();
+    result = 31 * result + segmentId.hashCode();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/AbstractIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/AbstractIndex.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/AbstractIndex.java
index 7e1ed8c..dd712fa 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/AbstractIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/AbstractIndex.java
@@ -19,11 +19,13 @@
 package org.apache.carbondata.core.carbon.datastore.block;
 
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.carbondata.core.cache.Cacheable;
 import org.apache.carbondata.core.carbon.datastore.DataRefNode;
 import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
 
-public abstract class AbstractIndex {
+public abstract class AbstractIndex implements Cacheable {
 
   /**
    * vo class which will hold the RS information of the block
@@ -41,6 +43,16 @@ public abstract class AbstractIndex {
   protected long totalNumberOfRows;
 
   /**
+   * atomic integer to maintain the access count for a column access
+   */
+  protected AtomicInteger accessCount = new AtomicInteger();
+
+  /**
+   * Table block meta size.
+   */
+  protected long memorySize;
+
+  /**
    * @return the totalNumberOfRows
    */
   public long getTotalNumberOfRows() {
@@ -61,10 +73,64 @@ public abstract class AbstractIndex {
     return dataRefNode;
   }
 
+  @Override public long getFileTimeStamp() {
+    return 0;
+  }
+
   /**
    * Below method will be used to load the data block
    *
-   * @param blockInfo block detail
+   * @param footerList footer list
    */
   public abstract void buildIndex(List<DataFileFooter> footerList);
+
+  /**
+   * the method returns the access count
+   *
+   * @return
+   */
+  @Override public int getAccessCount() {
+    return accessCount.get();
+  }
+
+  /**
+   * The method returns table block size
+   *
+   * @return
+   */
+  @Override public long getMemorySize() {
+    return this.memorySize;
+  }
+
+  /**
+   * The method is used to set the access count
+   */
+  public void incrementAccessCount() {
+    accessCount.incrementAndGet();
+  }
+
+  /**
+   * This method will release the objects and set default value for primitive types
+   */
+  public void clear() {
+    decrementAccessCount();
+  }
+
+  /**
+   * This method will decrement the access count for a column by 1
+   * whenever a column usage is complete
+   */
+  private void decrementAccessCount() {
+    if (accessCount.get() > 0) {
+      accessCount.decrementAndGet();
+    }
+  }
+
+  /**
+   * the method is used to set the memory size of the b-tree
+   * @param memorySize
+   */
+  public void setMemorySize(long memorySize) {
+    this.memorySize = memorySize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfo.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfo.java
index 96879aa..2a49daf 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfo.java
@@ -18,6 +18,9 @@
  */
 package org.apache.carbondata.core.carbon.datastore.block;
 
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
 /**
  * Below class will be used to store table block info
  * As in blocklet distribution we are dividing the same block
@@ -32,6 +35,10 @@ public class BlockInfo {
    * about the block
    */
   private TableBlockInfo info;
+  /**
+   * unique blockName
+   */
+  private String blockUniqueName;
 
   /**
    * Constructor
@@ -40,6 +47,18 @@ public class BlockInfo {
    */
   public BlockInfo(TableBlockInfo info) {
     this.info = info;
+    init();
+  }
+
+  /**
+   * init the block unique name
+   */
+  private void init() {
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append(this.info.getSegmentId());
+    stringBuilder.append(CarbonCommonConstants.FILE_SEPARATOR);
+    stringBuilder.append(CarbonTablePath.getCarbonDataFileName(this.info.getFilePath()));
+    this.blockUniqueName = stringBuilder.toString();
   }
 
   /**
@@ -104,4 +123,12 @@ public class BlockInfo {
     }
     return true;
   }
+
+  /**
+   * returns unique blockname
+   * @return
+   */
+  public String getBlockUniqueName() {
+    return blockUniqueName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentTaskIndexWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentTaskIndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentTaskIndexWrapper.java
new file mode 100644
index 0000000..cd278b5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentTaskIndexWrapper.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.carbon.datastore.block;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore;
+
+/**
+ * SegmentTaskIndexWrapper class holds the  taskIdToTableSegmentMap
+ */
+public class SegmentTaskIndexWrapper implements Cacheable {
+
+  /**
+   * task_id to table segment index map
+   */
+  private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> taskIdToTableSegmentMap;
+  /**
+   * atomic integer to maintain the access count for a column access
+   */
+  protected AtomicInteger accessCount = new AtomicInteger();
+
+  /**
+   * Table block meta size.
+   */
+  protected long memorySize;
+
+  public SegmentTaskIndexWrapper(
+      Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> taskIdToTableSegmentMap) {
+    this.taskIdToTableSegmentMap = taskIdToTableSegmentMap;
+  }
+
+  public Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getTaskIdToTableSegmentMap() {
+    return taskIdToTableSegmentMap;
+  }
+
+  public void setTaskIdToTableSegmentMap(
+      Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> taskIdToTableSegmentMap) {
+    this.taskIdToTableSegmentMap = taskIdToTableSegmentMap;
+  }
+
+  /**
+   * return segment size
+   *
+   * @param memorySize
+   */
+  public void setMemorySize(long memorySize) {
+    this.memorySize = memorySize;
+  }
+
+  /**
+   * returns the timestamp
+   *
+   * @return
+   */
+  @Override public long getFileTimeStamp() {
+    return 0;
+  }
+
+  /**
+   * returns the access count
+   *
+   * @return
+   */
+  @Override public int getAccessCount() {
+    return accessCount.get();
+  }
+
+  /**
+   * returns the memory size
+   *
+   * @return
+   */
+  @Override public long getMemorySize() {
+    return memorySize;
+  }
+
+  /**
+   * The method is used to set the access count
+   */
+  public void incrementAccessCount() {
+    accessCount.incrementAndGet();
+  }
+
+  /**
+   * This method will release the objects and set default value for primitive types
+   */
+  public void clear() {
+    decrementAccessCount();
+  }
+
+  /**
+   * This method will decrement the access count for a column by 1
+   * whenever a column usage is complete
+   */
+  private void decrementAccessCount() {
+    if (accessCount.get() > 0) {
+      accessCount.decrementAndGet();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockUniqueIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockUniqueIdentifier.java
new file mode 100644
index 0000000..6e57e0f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockUniqueIdentifier.java
@@ -0,0 +1,72 @@
+package org.apache.carbondata.core.carbon.datastore.block;
+
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+/**
+ * Class : Holds the info to uniquely identify a blocks
+ */
+public class TableBlockUniqueIdentifier {
+
+  /**
+   * table fully qualified name
+   */
+  private AbsoluteTableIdentifier absoluteTableIdentifier;
+
+  /**
+   * table block info
+   */
+  private TableBlockInfo tableBlockInfo;
+
+  public TableBlockUniqueIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier,
+      TableBlockInfo tableBlockInfo) {
+    this.absoluteTableIdentifier = absoluteTableIdentifier;
+    this.tableBlockInfo = tableBlockInfo;
+  }
+
+  public AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
+    return absoluteTableIdentifier;
+  }
+
+  public void setAbsoluteTableIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier) {
+    this.absoluteTableIdentifier = absoluteTableIdentifier;
+  }
+
+  public TableBlockInfo getTableBlockInfo() {
+    return tableBlockInfo;
+  }
+
+  public void setTableBlockInfo(TableBlockInfo tableBlockInfo) {
+    this.tableBlockInfo = tableBlockInfo;
+  }
+
+  @Override public int hashCode() {
+    return this.absoluteTableIdentifier.hashCode() + this.tableBlockInfo.hashCode();
+  }
+
+  @Override public boolean equals(Object other) {
+    if (this == other) return true;
+    if (other == null || getClass() != other.getClass()) return false;
+    TableBlockUniqueIdentifier tableBlockUniqueIdentifier = (TableBlockUniqueIdentifier) other;
+    return this.absoluteTableIdentifier.equals(tableBlockUniqueIdentifier.absoluteTableIdentifier)
+        && this.tableBlockInfo.equals(tableBlockUniqueIdentifier.tableBlockInfo);
+  }
+
+  /**
+   * returns the String value to uniquely identify a block
+   *
+   * @return
+   */
+  public String getUniqueTableBlockName() {
+    BlockInfo blockInfo = new BlockInfo(this.tableBlockInfo);
+    CarbonTableIdentifier carbonTableIdentifier =
+        this.absoluteTableIdentifier.getCarbonTableIdentifier();
+    String uniqueTableBlockName = carbonTableIdentifier.getDatabaseName()
+        + CarbonCommonConstants.FILE_SEPARATOR + carbonTableIdentifier
+        .getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR
+        + this.tableBlockInfo.getSegmentId()
+        + CarbonCommonConstants.FILE_SEPARATOR + blockInfo.hashCode();
+    return uniqueTableBlockName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
index cda971a..8c9f297 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
@@ -428,4 +428,17 @@ public class CarbonTablePath extends Path {
     });
     return files;
   }
-}
\ No newline at end of file
+
+  /**
+   * returns the carbondata file name
+   *
+   * @param carbonDataFilePath carbondata file path
+   * @return
+   */
+  public static String getCarbonDataFileName(String carbonDataFilePath) {
+    String carbonDataFileName = carbonDataFilePath
+        .substring(carbonDataFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR) + 1,
+            carbonDataFilePath.indexOf(CARBON_DATA_EXT));
+    return carbonDataFileName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b6ab4ef6/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index f766042..6c159e6 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -557,13 +557,18 @@ public final class CarbonCommonConstants {
   public static final String SCHEMAS_MODIFIED_TIME_FILE = "modifiedTime.mdt";
   public static final String DEFAULT_INVISIBLE_DUMMY_MEASURE = "default_dummy_measure";
   /**
-   * max level cache size upto which level cache will be loaded in memory
+   * max driver lru cache size upto which lru cache will be loaded in memory
    */
-  public static final String CARBON_MAX_LEVEL_CACHE_SIZE = "carbon.max.level.cache.size";
+  public static final String CARBON_MAX_DRIVER_LRU_CACHE_SIZE = "carbon.max.driver.lru.cache.size";
   /**
-   * max level cache size default value in GB
+   * max executor lru cache size upto which lru cache will be loaded in memory
    */
-  public static final String CARBON_MAX_LEVEL_CACHE_SIZE_DEFAULT = "-1";
+  public static final String CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE =
+      "carbon.max.executor.lru.cache.size";
+  /**
+   * max lru cache size default value in MB
+   */
+  public static final String CARBON_MAX_LRU_CACHE_SIZE_DEFAULT = "-1";
   /**
    * DOUBLE_VALUE_MEASURE
    */
@@ -991,6 +996,10 @@ public final class CarbonCommonConstants {
    */
   public static final String DICTIONARY_SERVER_PORT_DEFAULT = "2030";
 
+  /**
+   * property to set is IS_DRIVER_INSTANCE
+   */
+  public static final String IS_DRIVER_INSTANCE = "is.driver.instance";
   private CarbonCommonConstants() {
   }
 }



Mime
View raw message