carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [27/50] incubator-carbondata git commit: Problem: After drop table dictionary and BTree instances are not getting cleared from driver memory. Due to this memory will keep growing and after some time GC problems will occur. In real case scenarios usually
Date Tue, 18 Apr 2017 12:04:03 GMT
Problem: After drop table dictionary and BTree instances are not getting cleared from driver
memory. Due to this memory will keep growing and after some time GC problems will occur. In
real case scenarios usually driver memory is on lower side hence it is more prone to GC problems.

Solution:
1. When a table is being clear BTree and dictionary instances from LRU cache.
2. Clear the access count for each segment immediately after block pruning rather then loading
all the segments first and at lats clearing access count for all the segments together.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/673f8c22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/673f8c22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/673f8c22

Branch: refs/heads/branch-1.1
Commit: 673f8c2263c94e3475552de4da06327029f06eb3
Parents: 144620d
Author: manishgupta88 <tomanishgupta18@gmail.com>
Authored: Tue Apr 11 12:21:35 2017 +0530
Committer: manishgupta88 <tomanishgupta18@gmail.com>
Committed: Thu Apr 13 21:13:08 2017 +0530

----------------------------------------------------------------------
 .../core/cache/dictionary/ManageDictionary.java | 108 -------------
 .../dictionary/ManageDictionaryAndBTree.java    | 158 +++++++++++++++++++
 .../carbondata/hadoop/CacheAccessClient.java    |  23 ++-
 .../carbondata/hadoop/CarbonInputFormat.java    | 153 +++++++++---------
 .../spark/rdd/AlterTableDropColumnRDD.scala     |   4 +-
 .../execution/command/carbonTableSchema.scala   |   8 +-
 .../carbondata/CarbonDataSourceSuite.scala      |   2 +-
 7 files changed, 269 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/673f8c22/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java
b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java
deleted file mode 100644
index 0a38890..0000000
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.cache.dictionary;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.ColumnIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-/**
- * This class is aimed at managing dictionary files for any new addition and deletion
- * and calling of clear cache for the non existing dictionary files
- */
-public class ManageDictionary {
-
-  /**
-   * Attribute for Carbon LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(ManageDictionary.class.getName());
-
-  /**
-   * This method will delete the dictionary files for the given column IDs and
-   * clear the dictionary cache
-   *
-   * @param columnSchema
-   * @param carbonTableIdentifier
-   * @param storePath
-   */
-  public static void deleteDictionaryFileAndCache(final ColumnSchema columnSchema,
-      CarbonTableIdentifier carbonTableIdentifier, String storePath) {
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier);
-    String metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath();
-    CarbonFile metadataDir = FileFactory
-        .getCarbonFile(metadataDirectoryPath, FileFactory.getFileType(metadataDirectoryPath));
-    // sort index file is created with dictionary size appended to it. So all the files
-    // with a given column ID need to be listed
-    CarbonFile[] listFiles = metadataDir.listFiles(new CarbonFileFilter() {
-      @Override public boolean accept(CarbonFile path) {
-        if (path.getName().startsWith(columnSchema.getColumnUniqueId())) {
-          return true;
-        }
-        return false;
-      }
-    });
-    for (CarbonFile file : listFiles) {
-      // try catch is inside for loop because even if one deletion fails, other files
-      // still need to be deleted
-      try {
-        FileFactory
-            .deleteFile(file.getCanonicalPath(), FileFactory.getFileType(file.getCanonicalPath()));
-      } catch (IOException e) {
-        LOGGER.error("Failed to delete dictionary or sortIndex file for column " + columnSchema
-            .getColumnName() + "with column ID " + columnSchema.getColumnUniqueId());
-      }
-    }
-    // remove dictionary cache
-    removeDictionaryColumnFromCache(carbonTableIdentifier, storePath,
-        columnSchema.getColumnUniqueId());
-  }
-
-  /**
-   * This method will remove dictionary cache from driver for both reverse and forward dictionary
-   *
-   * @param carbonTableIdentifier
-   * @param storePath
-   * @param columnId
-   */
-  public static void removeDictionaryColumnFromCache(CarbonTableIdentifier carbonTableIdentifier,
-      String storePath, String columnId) {
-    Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache =
-        CacheProvider.getInstance().createCache(CacheType.REVERSE_DICTIONARY, storePath);
-    DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
-        new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
-            new ColumnIdentifier(columnId, null, null));
-    dictCache.invalidate(dictionaryColumnUniqueIdentifier);
-    dictCache = CacheProvider.getInstance().createCache(CacheType.FORWARD_DICTIONARY, storePath);
-    dictCache.invalidate(dictionaryColumnUniqueIdentifier);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/673f8c22/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
new file mode 100644
index 0000000..a50bf15
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.cache.dictionary;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * This class is aimed at managing dictionary files for any new addition and deletion
+ * and calling of clear cache for BTree and dictionary instances from LRU cache
+ */
+public class ManageDictionaryAndBTree {
+
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(ManageDictionaryAndBTree.class.getName());
+
+  /**
+   * This method will delete the dictionary files for the given column IDs and
+   * clear the dictionary cache
+   *
+   * @param columnSchema
+   * @param carbonTableIdentifier
+   * @param storePath
+   */
+  public static void deleteDictionaryFileAndCache(final ColumnSchema columnSchema,
+      CarbonTableIdentifier carbonTableIdentifier, String storePath) {
+    CarbonTablePath carbonTablePath =
+        CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier);
+    String metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath();
+    CarbonFile metadataDir = FileFactory
+        .getCarbonFile(metadataDirectoryPath, FileFactory.getFileType(metadataDirectoryPath));
+    // sort index file is created with dictionary size appended to it. So all the files
+    // with a given column ID need to be listed
+    CarbonFile[] listFiles = metadataDir.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile path) {
+        if (path.getName().startsWith(columnSchema.getColumnUniqueId())) {
+          return true;
+        }
+        return false;
+      }
+    });
+    for (CarbonFile file : listFiles) {
+      // try catch is inside for loop because even if one deletion fails, other files
+      // still need to be deleted
+      try {
+        FileFactory
+            .deleteFile(file.getCanonicalPath(), FileFactory.getFileType(file.getCanonicalPath()));
+      } catch (IOException e) {
+        LOGGER.error("Failed to delete dictionary or sortIndex file for column " + columnSchema
+            .getColumnName() + "with column ID " + columnSchema.getColumnUniqueId());
+      }
+    }
+    // remove dictionary cache
+    removeDictionaryColumnFromCache(carbonTableIdentifier, storePath,
+        columnSchema.getColumnUniqueId());
+  }
+
+  /**
+   * This method will remove dictionary cache from driver for both reverse and forward dictionary
+   *
+   * @param carbonTableIdentifier
+   * @param storePath
+   * @param columnId
+   */
+  public static void removeDictionaryColumnFromCache(CarbonTableIdentifier carbonTableIdentifier,
+      String storePath, String columnId) {
+    Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache =
+        CacheProvider.getInstance().createCache(CacheType.REVERSE_DICTIONARY, storePath);
+    DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
+        new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
+            new ColumnIdentifier(columnId, null, null));
+    dictCache.invalidate(dictionaryColumnUniqueIdentifier);
+    dictCache = CacheProvider.getInstance().createCache(CacheType.FORWARD_DICTIONARY, storePath);
+    dictCache.invalidate(dictionaryColumnUniqueIdentifier);
+  }
+
+  /**
+   * This mwthod will invalidate both BTree and dictionary instances from LRU cache
+   *
+   * @param carbonTable
+   */
+  public static void clearBTreeAndDictionaryLRUCache(CarbonTable carbonTable) {
+    // clear Btree cache from LRU cache
+    LoadMetadataDetails[] loadMetadataDetails =
+        SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath());
+    if (null != loadMetadataDetails) {
+      String[] segments = new String[loadMetadataDetails.length];
+      int i = 0;
+      for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) {
+        segments[i++] = loadMetadataDetail.getLoadName();
+      }
+      invalidateBTreeCache(carbonTable.getAbsoluteTableIdentifier(), segments);
+    }
+    // clear dictionary cache from LRU cache
+    List<CarbonDimension> dimensions =
+        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+    for (CarbonDimension dimension : dimensions) {
+      removeDictionaryColumnFromCache(carbonTable.getCarbonTableIdentifier(),
+          carbonTable.getStorePath(), dimension.getColumnId());
+    }
+  }
+
+  /**
+   * This method will remove the BTree instances from LRU cache
+   *
+   * @param absoluteTableIdentifier
+   * @param segments
+   */
+  public static void invalidateBTreeCache(AbsoluteTableIdentifier absoluteTableIdentifier,
+      String[] segments) {
+    Cache<Object, Object> driverBTreeCache = CacheProvider.getInstance()
+        .createCache(CacheType.DRIVER_BTREE, absoluteTableIdentifier.getStorePath());
+    for (String segmentNo : segments) {
+      TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
+          new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentNo);
+      driverBTreeCache.invalidate(tableSegmentUniqueIdentifier);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/673f8c22/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java
index 85ecfb8..e8d292d 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java
@@ -18,7 +18,9 @@ package org.apache.carbondata.hadoop;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -30,7 +32,7 @@ public class CacheAccessClient<K, V> {
   /**
    * List of segments
    */
-  private List<K> segmentList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+  private Set<K> segmentSet = new HashSet<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
   private Cache<K, V> cache;
 
@@ -48,7 +50,7 @@ public class CacheAccessClient<K, V> {
   public V getIfPresent(K key) {
     V value = cache.getIfPresent(key);
     if (value != null) {
-      segmentList.add(key);
+      segmentSet.add(key);
     }
     return value;
   }
@@ -64,7 +66,7 @@ public class CacheAccessClient<K, V> {
   public V get(K key) throws IOException {
     V value = cache.get(key);
     if (value != null) {
-      segmentList.add(key);
+      segmentSet.add(key);
     }
     return value;
   }
@@ -73,7 +75,9 @@ public class CacheAccessClient<K, V> {
    * the method is used to clear access count of the unused segments cacheable object
    */
   public void close() {
-    cache.clearAccessCount(segmentList);
+    List<K> segmentArrayList = new ArrayList<>(segmentSet.size());
+    segmentArrayList.addAll(segmentSet);
+    cache.clearAccessCount(segmentArrayList);
     cache = null;
   }
 
@@ -88,4 +92,15 @@ public class CacheAccessClient<K, V> {
     }
   }
 
+  /**
+   * This method will clear the access count for a given list of segments
+   *
+   * @param segmentList
+   */
+  public void clearAccessCount(List<K> segmentList) {
+    cache.clearAccessCount(segmentList);
+    // remove from segment set so that access count is not decremented again during close
operation
+    segmentSet.removeAll(segmentList);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/673f8c22/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index b330f12..b1a383e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -223,56 +223,60 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
   @Override public List<InputSplit> getSplits(JobContext job) throws IOException {
     AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
     CacheClient cacheClient = new CacheClient(identifier.getStorePath());
-    List<String> invalidSegments = new ArrayList<>();
-    List<UpdateVO> invalidTimestampsList = new ArrayList<>();
-
-    // get all valid segments and set them into the configuration
-    if (getSegmentsToAccess(job).length == 0) {
-      SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
-      SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
-              segmentStatusManager.getValidAndInvalidSegments();
-      SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
-      setSegmentsToAccess(job.getConfiguration(), segments.getValidSegments());
-      if (segments.getValidSegments().size() == 0) {
-        return new ArrayList<>(0);
+    try {
+      List<String> invalidSegments = new ArrayList<>();
+      List<UpdateVO> invalidTimestampsList = new ArrayList<>();
+
+      // get all valid segments and set them into the configuration
+      if (getSegmentsToAccess(job).length == 0) {
+        SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
+        SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
+            segmentStatusManager.getValidAndInvalidSegments();
+        SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
+        setSegmentsToAccess(job.getConfiguration(), segments.getValidSegments());
+        if (segments.getValidSegments().size() == 0) {
+          return new ArrayList<>(0);
+        }
+
+        // remove entry in the segment index if there are invalid segments
+        invalidSegments.addAll(segments.getInvalidSegments());
+        for (String invalidSegmentId : invalidSegments) {
+          invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
+        }
+        if (invalidSegments.size() > 0) {
+          List<TableSegmentUniqueIdentifier> invalidSegmentsIds =
+              new ArrayList<>(invalidSegments.size());
+          for (String segId : invalidSegments) {
+            invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(identifier, segId));
+          }
+          cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds);
+        }
       }
 
-      // remove entry in the segment index if there are invalid segments
-      invalidSegments.addAll(segments.getInvalidSegments());
-      for (String invalidSegmentId : invalidSegments) {
-        invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
+      // process and resolve the expression
+      Expression filter = getFilterPredicates(job.getConfiguration());
+      CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
+      // this will be null in case of corrupt schema file.
+      if (null == carbonTable) {
+        throw new IOException("Missing/Corrupt schema file for table.");
       }
+      CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+      FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier);
+
+      // do block filtering and get split
+      List<InputSplit> splits = getSplits(job, filterInterface, cacheClient);
+      // pass the invalid segment to task side in order to remove index entry in task side
       if (invalidSegments.size() > 0) {
-        List<TableSegmentUniqueIdentifier> invalidSegmentsIds
-            = new ArrayList<>(invalidSegments.size());
-        for (String segId : invalidSegments) {
-          invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(identifier, segId));
+        for (InputSplit split : splits) {
+          ((CarbonInputSplit) split).setInvalidSegments(invalidSegments);
+          ((CarbonInputSplit) split).setInvalidTimestampRange(invalidTimestampsList);
         }
-        cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds);
-      }
-    }
-
-    // process and resolve the expression
-    Expression filter = getFilterPredicates(job.getConfiguration());
-    CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
-    // this will be null in case of corrupt schema file.
-    if (null == carbonTable) {
-      throw new IOException("Missing/Corrupt schema file for table.");
-    }
-    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
-    FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier);
-
-    // do block filtering and get split
-    List<InputSplit> splits = getSplits(job, filterInterface, cacheClient);
-    cacheClient.close();
-    // pass the invalid segment to task side in order to remove index entry in task side
-    if (invalidSegments.size() > 0) {
-      for (InputSplit split : splits) {
-        ((CarbonInputSplit) split).setInvalidSegments(invalidSegments);
-        ((CarbonInputSplit) split).setInvalidTimestampRange(invalidTimestampsList);
       }
+      return splits;
+    } finally {
+      // close the cache cache client to clear LRU cache memory
+      cacheClient.close();
     }
-    return splits;
   }
 
   private List<InputSplit> getSplitsInternal(JobContext job) throws IOException {
@@ -354,37 +358,44 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
       AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
       String segmentId, CacheClient cacheClient, SegmentUpdateStatusManager updateStatusManager)
       throws IOException {
-    QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
-    QueryStatistic statistic = new QueryStatistic();
-    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
-        getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId, cacheClient,
-            updateStatusManager);
-
-    List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
-
-    if (null != segmentIndexMap) {
-      // build result
-      for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
-        List<DataRefNode> filterredBlocks;
-        // if no filter is given get all blocks from Btree Index
-        if (null == resolver) {
-          filterredBlocks = getDataBlocksOfIndex(abstractIndex);
-        } else {
-          // apply filter and get matching blocks
-          filterredBlocks = filterExpressionProcessor.getFilterredBlocks(
-                  abstractIndex.getDataRefNode(),
-                  resolver,
-                  abstractIndex,
-                  absoluteTableIdentifier
-          );
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
+    try {
+      QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
+      QueryStatistic statistic = new QueryStatistic();
+      segmentIndexMap =
+          getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId, cacheClient,
+              updateStatusManager);
+      List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
+      if (null != segmentIndexMap) {
+        // build result
+        for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
+          List<DataRefNode> filterredBlocks;
+          // if no filter is given get all blocks from Btree Index
+          if (null == resolver) {
+            filterredBlocks = getDataBlocksOfIndex(abstractIndex);
+          } else {
+            // apply filter and get matching blocks
+            filterredBlocks = filterExpressionProcessor
+                .getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, abstractIndex,
+                    absoluteTableIdentifier);
+          }
+          resultFilterredBlocks.addAll(filterredBlocks);
         }
-        resultFilterredBlocks.addAll(filterredBlocks);
+      }
+      statistic
+          .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
+      recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
+      return resultFilterredBlocks;
+    } finally {
+      // clean up the access count for a segment as soon as its usage is complete so that
in
+      // low memory systems the same memory can be utilized efficiently
+      if (null != segmentIndexMap) {
+        List<TableSegmentUniqueIdentifier> tableSegmentUniqueIdentifiers = new ArrayList<>(1);
+        tableSegmentUniqueIdentifiers
+            .add(new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId));
+        cacheClient.getSegmentAccessClient().clearAccessCount(tableSegmentUniqueIdentifiers);
       }
     }
-    statistic
-        .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
-    recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
-    return resultFilterredBlocks;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/673f8c22/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
index 49dadd3..53796bb 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
@@ -21,7 +21,7 @@ import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.cache.dictionary.ManageDictionary
+import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.encoder.Encoding
@@ -65,7 +65,7 @@ class AlterTableDropColumnRDD[K, V](sc: SparkContext,
         val columnSchema = split.asInstanceOf[DropColumnPartition].columnSchema
         if (columnSchema.hasEncoding(Encoding.DICTIONARY) &&
             !columnSchema.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-          ManageDictionary
+          ManageDictionaryAndBTree
             .deleteDictionaryFileAndCache(columnSchema, carbonTableIdentifier, carbonStorePath)
         }
       } catch {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/673f8c22/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 77a0d90..1451247 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -37,7 +37,7 @@ import org.codehaus.jackson.map.ObjectMapper
 
 import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.cache.dictionary.ManageDictionary
+import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
@@ -723,6 +723,12 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
         sys.error("Table is locked for deletion. Please try after some time")
       }
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
+      val carbonTable = CarbonEnv.get.carbonMetastore.getTableFromMetadata(dbName, tableName)
+        .map(_.carbonTable).getOrElse(null)
+      if (null != carbonTable) {
+        // clear driver B-tree and dictionary cache
+        ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
+      }
       CarbonEnv.get.carbonMetastore.dropTable(storePath, identifier)(sparkSession)
       LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
     } finally {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/673f8c22/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
index 30634d1..ac7dce3 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
@@ -171,7 +171,7 @@ class CarbonDataSourceSuite extends QueryTest with BeforeAndAfterAll {
       case e =>
         println(e.getMessage)
     }
-    checkAnswer(sql("select * from testdb.test1"), Seq(Row("xx", 1), Row("xx", 11)))
+    checkAnswer(sql("select count(*) from testdb.test1"), Seq(Row(2)))
     sql("drop table testdb.test1")
     sql("drop database testdb")
   }


Mime
View raw message