carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [1/2] incubator-carbondata git commit: fixed block loading issue in case of blocklet distribution
Date Mon, 07 Nov 2016 02:15:41 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master ed3810aa4 -> d2041356d


fixed block loading issue in case of blocklet distribution


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/041dab09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/041dab09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/041dab09

Branch: refs/heads/master
Commit: 041dab09f09c31aae95f1de954a7261bde53830c
Parents: ed3810a
Author: kumarvishal <kumarvishal.1802@gmail.com>
Authored: Wed Nov 2 22:32:42 2016 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Mon Nov 7 07:44:05 2016 +0530

----------------------------------------------------------------------
 .../core/carbon/datastore/BlockIndexStore.java  | 25 ++++++++++++++------
 .../core/carbon/datastore/block/BlockInfo.java  |  2 +-
 .../core/constants/CarbonCommonConstants.java   |  2 +-
 3 files changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/041dab09/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
index bbebda0..34c2983 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
@@ -80,6 +80,13 @@ public class BlockIndexStore {
    */
   private Map<AbsoluteTableIdentifier, Object> tableLockMap;
 
+  /**
+   * block info to future task mapping
+   * useful when blocklet distribution is enabled and
+   * same block is loaded by multiple thread
+   */
+  private Map<BlockInfo, Future<AbstractIndex>> mapOfBlockInfoToFuture;
+
   private BlockIndexStore() {
     tableBlocksMap = new ConcurrentHashMap<AbsoluteTableIdentifier, Map<BlockInfo,
AbstractIndex>>(
         CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
@@ -87,6 +94,7 @@ public class BlockIndexStore {
         CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     blockInfoLock = new ConcurrentHashMap<BlockInfo, Object>();
     segmentIdToBlockListMap = new ConcurrentHashMap<>();
+    mapOfBlockInfoToFuture = new ConcurrentHashMap<>();
   }
 
   /**
@@ -137,7 +145,6 @@ public class BlockIndexStore {
       blockInfosNeedToLoad = fillSegmentIdToTableInfoMap(tableBlocksInfos, absoluteTableIdentifier);
     }
     AbstractIndex tableBlock = null;
-    List<Future<AbstractIndex>> blocksList = new ArrayList<Future<AbstractIndex>>();
     int counter = -1;
     for (BlockInfo blockInfo : blockInfosNeedToLoad) {
       counter++;
@@ -173,7 +180,12 @@ public class BlockIndexStore {
           tableBlock = tableBlockMapTemp.get(blockInfo);
           // if still block is not present then load the block
           if (null == tableBlock) {
-            blocksList.add(executor.submit(new BlockLoaderThread(blockInfo, tableBlockMapTemp)));
+            if (null == mapOfBlockInfoToFuture.get(blockInfo)) {
+              mapOfBlockInfoToFuture.put(blockInfo, executor
+                  .submit(new BlockLoaderThread(blockInfo, tableBlockMapTemp)));
+            }
+          } else {
+            loadedBlock[counter] = tableBlock;
           }
         }
       } else {
@@ -190,7 +202,7 @@ public class BlockIndexStore {
       throw new IndexBuilderException(e);
     }
     // fill the block which were not loaded before to loaded blocks array
-    fillLoadedBlocks(loadedBlock, blocksList);
+    fillLoadedBlocks(loadedBlock, blockInfosNeedToLoad);
     return Arrays.asList(loadedBlock);
   }
 
@@ -234,13 +246,12 @@ public class BlockIndexStore {
    * @param blocksList       blocks loaded in thread
    * @throws IndexBuilderException in case of any failure
    */
-  private void fillLoadedBlocks(AbstractIndex[] loadedBlockArray,
-      List<Future<AbstractIndex>> blocksList) throws IndexBuilderException {
-    int blockCounter = 0;
+  private void fillLoadedBlocks(AbstractIndex[] loadedBlockArray, List<BlockInfo> blockInfos)
+      throws IndexBuilderException {
     for (int i = 0; i < loadedBlockArray.length; i++) {
       if (null == loadedBlockArray[i]) {
         try {
-          loadedBlockArray[i] = blocksList.get(blockCounter++).get();
+          loadedBlockArray[i] = mapOfBlockInfoToFuture.get(blockInfos.get(i)).get();
         } catch (InterruptedException | ExecutionException e) {
           throw new IndexBuilderException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/041dab09/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfo.java
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfo.java
index 8092c7c..96879aa 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfo.java
@@ -91,7 +91,7 @@ public class BlockInfo {
     if (info.getBlockOffset() != other.info.getBlockOffset()) {
       return false;
     }
-    if (info.getBlockLength() != info.getBlockLength()) {
+    if (info.getBlockLength() != other.info.getBlockLength()) {
       return false;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/041dab09/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 997f412..e217d5d 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -882,7 +882,7 @@ public final class CarbonCommonConstants {
   /**
    * to enable blocklet distribution default value
    */
-  public static String ENABLE_BLOCKLET_DISTRIBUTION_DEFAULTVALUE = "true";
+  public static String ENABLE_BLOCKLET_DISTRIBUTION_DEFAULTVALUE = "false";
 
   /**
    * This batch size is used to send rows from load step to another step in batches.


Mime
View raw message