carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [1/2] incubator-carbondata git commit: CARBONDATA-117 BlockLet distribution for optimum resource usage
Date Tue, 23 Aug 2016 15:01:58 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 2d4609cdf -> 5ebf90a87


CARBONDATA-117 BlockLet distribution for optimum resource usage


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/fe1b0f07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/fe1b0f07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/fe1b0f07

Branch: refs/heads/master
Commit: fe1b0f07deda03fe21b98191be7750bf61d8520c
Parents: 2d4609c
Author: mohammadshahidkhan <mohdshahidkhan1987@gmail.com>
Authored: Wed Jul 20 16:02:18 2016 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Tue Aug 23 20:19:07 2016 +0530

----------------------------------------------------------------------
 conf/carbon.properties.template                 |   2 +
 .../core/carbon/datastore/BlockIndexStore.java  |   6 +-
 .../carbon/datastore/block/BlockletInfos.java   | 112 +++++++++++++++++++
 .../carbon/datastore/block/TableBlockInfo.java  |  50 ++++++++-
 .../core/constants/CarbonCommonConstants.java   |  11 ++
 .../core/util/DataFileFooterConverter.java      |  28 ++++-
 .../executor/impl/AbstractQueryExecutor.java    |  17 ++-
 .../scan/executor/infos/BlockExecutionInfo.java |  46 ++++++++
 .../AbstractDetailQueryResultIterator.java      |  14 ++-
 .../carbondata/hadoop/CarbonInputFormat.java    |  23 +++-
 .../carbondata/hadoop/CarbonInputSplit.java     |  18 +++
 .../carbondata/hadoop/CarbonRecordReader.java   |   5 +-
 .../hadoop/ft/CarbonInputMapperTest.java        |   3 +-
 .../carbondata/spark/load/CarbonLoaderUtil.java |  74 ++++++++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  11 +-
 15 files changed, 397 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/conf/carbon.properties.template
----------------------------------------------------------------------
diff --git a/conf/carbon.properties.template b/conf/carbon.properties.template
index b94fbe4..bd26252 100644
--- a/conf/carbon.properties.template
+++ b/conf/carbon.properties.template
@@ -85,6 +85,8 @@ carbon.enable.quick.filter=false
 #carbon.load.metadata.lock.retries=3
 ##Maximum number of blocklets written in a single file :Min=1:Max=1000
 #carbon.max.file.size=100
+##Minimum blocklets needed for distribution.
+#carbon.blockletdistribution.min.blocklet.size=10
 ##Interval between the retries to get the lock
 #carbon.load.metadata.lock.retry.timeout.sec=5
 ##Temporary store location, By default it will take System.getProperty("java.io.tmpdir")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
index be48ce5..07815c0 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
@@ -20,7 +20,6 @@ package org.apache.carbondata.core.carbon.datastore;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -108,10 +107,7 @@ public class BlockIndexStore {
       AbsoluteTableIdentifier absoluteTableIdentifier) throws IndexBuilderException {
     AbstractIndex[] loadedBlock = new AbstractIndex[tableBlocksInfos.size()];
     addTableLockObject(absoluteTableIdentifier);
-    // sort the block info
-    // so block will be loaded in sorted order this will be required for
-    // query execution
-    Collections.sort(tableBlocksInfos);
+
     // get the instance
     Object lockObject = tableLockMap.get(absoluteTableIdentifier);
     Map<TableBlockInfo, AbstractIndex> tableBlockMapTemp = null;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockletInfos.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockletInfos.java
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockletInfos.java
new file mode 100644
index 0000000..4251888
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/BlockletInfos.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore.block;
+
+import java.io.Serializable;
+
+/**
+ * The class holds the blocks blocklets info
+ */
+public class BlockletInfos implements Serializable {
+  /**
+   * no of blockLets
+   */
+  private int noOfBlockLets = 0;
+
+  /**
+   * start blocklet number
+   */
+  private int startBlockletNumber;
+  /**
+   * end blocklet number
+   */
+  private int numberOfBlockletToScan;
+  /**
+   * default constructor
+   */
+  public BlockletInfos(){
+  }
+  /**
+   * constructor to initialize the blockletinfo
+   * @param noOfBlockLets
+   * @param startBlockletNumber
+   * @param numberOfBlockletToScan
+   */
+  public BlockletInfos(int noOfBlockLets, int startBlockletNumber, int numberOfBlockletToScan)
{
+    this.noOfBlockLets = noOfBlockLets;
+    this.startBlockletNumber = startBlockletNumber;
+    this.numberOfBlockletToScan = numberOfBlockletToScan;
+  }
+
+  /**
+   * returns the number of blockLets
+   *
+   * @return
+   */
+  public int getNoOfBlockLets() {
+    return noOfBlockLets;
+  }
+
+  /**
+   * sets the number of blockLets
+   *
+   * @param noOfBlockLets
+   */
+  public void setNoOfBlockLets(int noOfBlockLets) {
+    this.noOfBlockLets = noOfBlockLets;
+  }
+
+  /**
+   * returns start blocklet number
+   *
+   * @return
+   */
+  public int getStartBlockletNumber() {
+    return startBlockletNumber;
+  }
+
+  /**
+   * set start blocklet number
+   *
+   * @param startBlockletNumber
+   */
+  public void setStartBlockletNumber(int startBlockletNumber) {
+    this.startBlockletNumber = startBlockletNumber;
+  }
+
+  /**
+   * returns end blocklet number
+   *
+   * @return
+   */
+  public int getNumberOfBlockletToScan() {
+    return numberOfBlockletToScan;
+  }
+
+  /**
+   * set end blocklet number to be scaned
+   *
+   * @param numberOfBlockletToScan
+   */
+  public void setNumberOfBlockletToScan(int numberOfBlockletToScan) {
+    this.numberOfBlockletToScan = numberOfBlockletToScan;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
index 06166fd..3d393b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
@@ -58,7 +58,10 @@ public class TableBlockInfo extends Distributable
   private String segmentId;
 
   private String[] locations;
-
+  /**
+   * The class holds the blockletsinfo
+   */
+  private BlockletInfos blockletInfos = new BlockletInfos();
 
   public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
       long blockLength) {
@@ -70,6 +73,25 @@ public class TableBlockInfo extends Distributable
   }
 
   /**
+   * constructor to initialize the TbaleBlockInfo with BlockletInfos
+   * @param filePath
+   * @param blockOffset
+   * @param segmentId
+   * @param locations
+   * @param blockLength
+   * @param blockletInfos
+   */
+  public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
+      long blockLength, BlockletInfos blockletInfos) {
+    this.filePath = FileFactory.getUpdatedFilePath(filePath);
+    this.blockOffset = blockOffset;
+    this.segmentId = segmentId;
+    this.locations = locations;
+    this.blockLength = blockLength;
+    this.blockletInfos = blockletInfos;
+  }
+
+  /**
    * @return the filePath
    */
   public String getFilePath() {
@@ -185,6 +207,16 @@ public class TableBlockInfo extends Distributable
         > ((TableBlockInfo) other).blockOffset + ((TableBlockInfo) other).blockLength)
{
       return 1;
     }
+    //compare the startBlockLetNumber
+    int diffStartBlockLetNumber =
+        blockletInfos.getStartBlockletNumber() - ((TableBlockInfo) other).blockletInfos
+            .getStartBlockletNumber();
+    if (diffStartBlockLetNumber < 0) {
+      return -1;
+    }
+    if (diffStartBlockLetNumber > 0) {
+      return 1;
+    }
     return 0;
   }
 
@@ -194,6 +226,7 @@ public class TableBlockInfo extends Distributable
     result = 31 * result + (int) (blockLength ^ (blockLength >>> 32));
     result = 31 * result + segmentId.hashCode();
     result = 31 * result + Arrays.hashCode(locations);
+    result = 31 * result + blockletInfos.getStartBlockletNumber();
     return result;
   }
 
@@ -201,4 +234,19 @@ public class TableBlockInfo extends Distributable
     return locations;
   }
 
+  /**
+   * returns BlockletInfos
+   * @return
+   */
+  public BlockletInfos getBlockletInfos() {
+    return blockletInfos;
+  }
+
+  /**
+   * set the blocklestinfos
+   * @param blockletInfos
+   */
+  public void setBlockletInfos(BlockletInfos blockletInfos) {
+    this.blockletInfos = blockletInfos;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 4fa77ba..21e5562 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -885,6 +885,17 @@ public final class CarbonCommonConstants {
    */
   public static final String ZOOKEEPER_URL = "spark.deploy.zookeeper.url";
 
+  /**
+   * configure the minimum blocklet size eligible for blocklet distribution
+   */
+  public static final String CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE =
+      "carbon.blockletdistribution.min.blocklet.size";
+
+  /**
+   * default blocklet size eligible for blocklet distribution
+   */
+  public static final int DEFAULT_CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE = 2;
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
index 3a1da8c..8d7e893 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.carbon.metadata.blocklet.sort.SortState;
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastorage.store.FileHolder;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
@@ -70,7 +71,7 @@ public class DataFileFooterConverter {
    * @throws IOException problem while reading the index file
    */
   public List<DataFileFooter> getIndexInfo(String filePath, List<TableBlockInfo>
tableBlockInfoList)
-      throws IOException {
+      throws IOException, CarbonUtilException {
     CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
     List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
     try {
@@ -94,10 +95,13 @@ public class DataFileFooterConverter {
         BlockIndex readBlockIndexInfo = indexReader.readBlockIndexInfo();
         blockletIndex = getBlockletIndex(readBlockIndexInfo.getBlock_index());
         dataFileFooter = new DataFileFooter();
+        TableBlockInfo tableBlockInfo = tableBlockInfoList.get(counter++);
+        int blockletSize = getBlockletSize(readBlockIndexInfo);
+        tableBlockInfo.getBlockletInfos().setNoOfBlockLets(blockletSize);
         dataFileFooter.setBlockletIndex(blockletIndex);
         dataFileFooter.setColumnInTable(columnSchemaList);
         dataFileFooter.setNumberOfRows(readBlockIndexInfo.getNum_rows());
-        dataFileFooter.setTableBlockInfo(tableBlockInfoList.get(counter++));
+        dataFileFooter.setTableBlockInfo(tableBlockInfo);
         dataFileFooter.setSegmentInfo(segmentInfo);
         dataFileFooters.add(dataFileFooter);
       }
@@ -108,6 +112,26 @@ public class DataFileFooterConverter {
   }
 
   /**
+   * the methods returns the number of blocklets in a block
+   * @param readBlockIndexInfo
+   * @return
+   */
+  private int getBlockletSize(BlockIndex readBlockIndexInfo) {
+    long num_rows = readBlockIndexInfo.getNum_rows();
+    int blockletSize = Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
+            CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL));
+    int remainder = (int) (num_rows % blockletSize);
+    int noOfBlockLet = (int) (num_rows / blockletSize);
+    // there could be some blocklets which will not
+    // contain the total records equal to the blockletSize
+    if (remainder > 0) {
+      noOfBlockLet = noOfBlockLet + 1;
+    }
+    return noOfBlockLet;
+  }
+
+  /**
    * Below method will be used to convert thrift file meta to wrapper file meta
    */
   public DataFileFooter readDataFileFooter(String filePath, long blockOffset, long blockLength)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
index e204572..832b2fa 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.scan.executor.impl;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -94,6 +95,10 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E>
{
     queryModel.setStatisticsRecorder(queryProperties.queryStatisticsRecorder);
     QueryUtil.resolveQueryModel(queryModel);
     QueryStatistic queryStatistic = new QueryStatistic();
+    // sort the block info
+    // so block will be loaded in sorted order this will be required for
+    // query execution
+    Collections.sort(queryModel.getTableBlockInfos());
     // get the table blocks
     try {
       queryProperties.dataBlocks = BlockIndexStore.getInstance()
@@ -194,8 +199,11 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E>
{
     // query
     // and query will be executed based on that infos
     for (int i = 0; i < queryProperties.dataBlocks.size(); i++) {
-      blockExecutionInfoList
-          .add(getBlockExecutionInfoForBlock(queryModel, queryProperties.dataBlocks.get(i)));
+      blockExecutionInfoList.add(
+          getBlockExecutionInfoForBlock(queryModel, queryProperties.dataBlocks.get(i),
+              queryModel.getTableBlockInfos().get(i).getBlockletInfos().getStartBlockletNumber(),
+              queryModel.getTableBlockInfos().get(i).getBlockletInfos()
+                  .getNumberOfBlockletToScan()));
     }
     queryProperties.complexDimensionInfoMap =
         blockExecutionInfoList.get(blockExecutionInfoList.size() - 1).getComlexDimensionInfoMap();
@@ -212,7 +220,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E>
{
    * @throws QueryExecutionException any failure during block info creation
    */
   protected BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel,
-      AbstractIndex blockIndex) throws QueryExecutionException {
+      AbstractIndex blockIndex, int startBlockletIndex, int numberOfBlockletToScan)
+      throws QueryExecutionException {
     BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
     SegmentProperties segmentProperties = blockIndex.getSegmentProperties();
     List<CarbonDimension> tableBlockDimensions = segmentProperties.getDimensions();
@@ -228,6 +237,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E>
{
         QueryUtil.getMaskedByteRange(updatedQueryDimension, blockKeyGenerator);
     int[] maksedByte =
         QueryUtil.getMaskedByte(blockKeyGenerator.getKeySizeInBytes(), maskByteRangesForBlock);
+    blockExecutionInfo.setStartBlockletIndex(startBlockletIndex);
+    blockExecutionInfo.setNumberOfBlockletToScan(numberOfBlockletToScan);
     blockExecutionInfo.setQueryDimensions(
         updatedQueryDimension.toArray(new QueryDimension[updatedQueryDimension.size()]));
     blockExecutionInfo.setQueryMeasures(queryModel.getQueryMeasures()

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
b/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
index ca5c2e0..d84a183 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
@@ -200,6 +200,16 @@ public class BlockExecutionInfo {
   private boolean isRawRecordDetailQuery;
 
   /**
+   * start index of blocklets
+   */
+  private int startBlockletIndex;
+
+  /**
+   * number of blocklet to be scanned
+   */
+  private int numberOfBlockletToScan;
+
+  /**
    * complexParentIndexToQueryMap
    */
   private Map<Integer, GenericQueryType> complexParentIndexToQueryMap;
@@ -678,4 +688,40 @@ public class BlockExecutionInfo {
   public void setQueryMeasures(QueryMeasure[] queryMeasures) {
     this.queryMeasures = queryMeasures;
   }
+
+  /**
+   * The method to set the number of blocklets to be scanned
+   *
+   * @param numberOfBlockletToScan
+   */
+  public void setNumberOfBlockletToScan(int numberOfBlockletToScan) {
+    this.numberOfBlockletToScan = numberOfBlockletToScan;
+  }
+
+  /**
+   * get the no of blocklet  to be scanned
+   *
+   * @return
+   */
+  public int getNumberOfBlockletToScan() {
+    return numberOfBlockletToScan;
+  }
+
+  /**
+   * returns the blocklet index to be scanned
+   *
+   * @return
+   */
+  public int getStartBlockletIndex() {
+    return startBlockletIndex;
+  }
+
+  /**
+   * set the blocklet index to be scanned
+   *
+   * @param startBlockletIndex
+   */
+  public void setStartBlockletIndex(int startBlockletIndex) {
+    this.startBlockletIndex = startBlockletIndex;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
index 2abe39a..d583560 100644
--- a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -92,9 +92,17 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator
{
       DataRefNodeFinder finder = new BTreeDataRefNodeFinder(blockInfo.getEachColumnValueSize());
       DataRefNode startDataBlock = finder
           .findFirstDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getStartKey());
-      DataRefNode endDataBlock = finder
-          .findLastDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getEndKey());
-      long numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber()
+ 1;
+      while(startDataBlock.nodeNumber()!= blockInfo.getStartBlockletIndex()) {
+        startDataBlock = startDataBlock.getNextDataRefNode();
+      }
+
+      long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan();
+      //if number of block is less than 0 then take end block.
+      if (numberOfBlockToScan <= 0) {
+        DataRefNode endDataBlock = finder
+            .findLastDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getEndKey());
+        numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber() + 1;
+      }
       blockInfo.setFirstDataBlock(startDataBlock);
       blockInfo.setNumberOfBlockToScan(numberOfBlockToScan);
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 04cbd53..6bc692f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.carbon.datastore.DataRefNodeFinder;
 import org.apache.carbondata.core.carbon.datastore.IndexKey;
 import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore;
 import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
 import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
@@ -273,7 +274,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
       }
 
       if (filterPredicates == null) {
-        return getSplitsInternal(job);
+        return getSplitsNonFilter(job);
       } else {
         if (filterPredicates instanceof Expression) {
           //process and resolve the expression.
@@ -290,6 +291,19 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
     }
   }
 
+  /**
+   * the method will return the blocks to be scanned with blocklets info
+   *
+   * @param job
+   * @return
+   * @throws IOException
+   * @throws IndexBuilderException
+   */
+  private List<InputSplit> getSplitsNonFilter(JobContext job)
+      throws IOException, IndexBuilderException {
+    return getSplits(job, null);
+  }
+
   private List<InputSplit> getSplitsInternal(JobContext job) throws IOException {
     List<InputSplit> splits = super.getSplits(job);
     List<InputSplit> carbonSplits = new ArrayList<InputSplit>(splits.size());
@@ -333,7 +347,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
         TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
         result.add(new CarbonInputSplit(segmentNo, new Path(tableBlockInfo.getFilePath()),
             tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
-            tableBlockInfo.getLocations()));
+            tableBlockInfo.getLocations(), tableBlockInfo.getBlockletInfos().getNoOfBlockLets()));
       }
     }
     return result;
@@ -496,9 +510,12 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
       // identify table blocks
       for (InputSplit inputSplit : getSplitsInternal(newJob)) {
         CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
+        BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(),
0,
+            carbonInputSplit.getNumberOfBlocklets());
         tableBlockInfoList.add(
             new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
-                segmentId, carbonInputSplit.getLocations(), carbonInputSplit.getLength()));
+                segmentId, carbonInputSplit.getLocations(), carbonInputSplit.getLength(),
+                blockletInfos));
       }
 
       Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index d215f13..01744f8 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -34,6 +34,10 @@ public class CarbonInputSplit extends FileSplit implements Serializable,
Writabl
 
   private static final long serialVersionUID = 3520344046772190207L;
   private String segmentId;
+  /**
+   * Number of BlockLets in a block
+   */
+  private int numberOfBlocklets = 0;
 
   public CarbonInputSplit() {
     super(null, 0, 0, new String[0]);
@@ -45,6 +49,12 @@ public class CarbonInputSplit extends FileSplit implements Serializable,
Writabl
     this.segmentId = segmentId;
   }
 
+  public CarbonInputSplit(String segmentId, Path path, long start, long length,
+      String[] locations, int numberOfBlocklets) {
+    this(segmentId, path, start, length, locations);
+    this.numberOfBlocklets = numberOfBlocklets;
+  }
+
   public static CarbonInputSplit from(String segmentId, FileSplit split) throws IOException
{
     return new CarbonInputSplit(segmentId, split.getPath(), split.getStart(), split.getLength(),
         split.getLocations());
@@ -66,4 +76,12 @@ public class CarbonInputSplit extends FileSplit implements Serializable,
Writabl
     out.writeUTF(segmentId);
   }
 
+  /**
+   * returns the number of blocklets
+   * @return
+   */
+  public int getNumberOfBlocklets() {
+    return numberOfBlocklets;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index caddfd1..fd0a438 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
@@ -58,10 +59,12 @@ public class CarbonRecordReader<T> extends RecordReader<Void,
T> {
       throws IOException, InterruptedException {
     CarbonInputSplit carbonInputSplit = (CarbonInputSplit) split;
     List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
+    BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(),
0,
+        carbonInputSplit.getNumberOfBlocklets());
     tableBlockInfoList.add(
         new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
             carbonInputSplit.getSegmentId(), carbonInputSplit.getLocations(),
-            carbonInputSplit.getLength()));
+            carbonInputSplit.getLength(), blockletInfos));
     queryModel.setTableBlockInfos(tableBlockInfoList);
     readSupport
         .intialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
index cdfd325..435b9c1 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
@@ -52,7 +52,8 @@ import org.junit.Test;
 
 public class CarbonInputMapperTest extends TestCase {
 
-  @Before public void setUp() throws Exception {
+  // changed setUp to static init block to avoid un wanted multiple time store creation
+  static {
     StoreCreator.createCarbonStore();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index b934b1d..918702d 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -49,7 +49,9 @@ import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.ColumnIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
 import org.apache.carbondata.core.carbon.datastore.block.Distributable;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
@@ -93,6 +95,22 @@ public final class CarbonLoaderUtil {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(CarbonLoaderUtil.class.getName());
+  /**
+   * minimum no of blocklet required for distribution
+   */
+  private static int minBlockLetsReqForDistribution = 0;
+
+  static {
+    String property = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE);
+    try {
+      minBlockLetsReqForDistribution = Integer.parseInt(property);
+    } catch (NumberFormatException ne) {
+      LOGGER.info("Invalid configuration. Consisering the defaul");
+      minBlockLetsReqForDistribution =
+          CarbonCommonConstants.DEFAULT_CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE;
+    }
+  }
 
   /**
    * dfs.bytes-per-checksum
@@ -1429,4 +1447,60 @@ public final class CarbonLoaderUtil {
   public static String[] getConfiguredLocalDirs(SparkConf conf) {
     return Utils.getConfiguredLocalDirs(conf);
   }
+
+  /**
+   * method to distribute the blocklets of a block in multiple blocks
+   * @param blockInfoList
+   * @param defaultParallelism
+   * @return
+     */
+  public static List<Distributable> distributeBlockLets(List<TableBlockInfo>
blockInfoList,
+      int defaultParallelism) {
+    LOGGER.info("No.Of Blocks before Blocklet distribution: " + blockInfoList.size());
+    List<Distributable> tableBlockInfos = new ArrayList<Distributable>();
+    if (blockInfoList.size() < defaultParallelism) {
+      for (TableBlockInfo tableBlockInfo : blockInfoList) {
+        int noOfBlockLets = tableBlockInfo.getBlockletInfos().getNoOfBlockLets();
+        LOGGER.info(
+            "No.Of blocklet : " + noOfBlockLets + ".Minimum blocklets required for distribution
: "
+                + minBlockLetsReqForDistribution);
+        if (noOfBlockLets < minBlockLetsReqForDistribution) {
+          tableBlockInfos.add(tableBlockInfo);
+          continue;
+        }
+        TableBlockInfo tableBlockInfo1 = null;
+        int rem = noOfBlockLets % minBlockLetsReqForDistribution;
+        int count = noOfBlockLets / minBlockLetsReqForDistribution;
+        if (rem > 0) {
+          count = count + 1;
+        }
+        for (int i = 0; i < count; i++) {
+          BlockletInfos blockletInfos = new BlockletInfos();
+          blockletInfos.setStartBlockletNumber(i * minBlockLetsReqForDistribution);
+          blockletInfos.setNumberOfBlockletToScan(minBlockLetsReqForDistribution);
+          blockletInfos.setNoOfBlockLets(blockletInfos.getNoOfBlockLets());
+          tableBlockInfo1 =
+              new TableBlockInfo(tableBlockInfo.getFilePath(), tableBlockInfo.getBlockOffset(),
+                  tableBlockInfo.getSegmentId(), tableBlockInfo.getLocations(),
+                  tableBlockInfo.getBlockLength(), blockletInfos);
+          tableBlockInfos.add(tableBlockInfo1);
+        }
+        //if rem is greater than 0 then for the last block
+        if (rem > 0) {
+          tableBlockInfo1.getBlockletInfos().setNumberOfBlockletToScan(rem);
+        }
+      }
+    }
+    if (tableBlockInfos.size() == 0) {
+      {
+        for (TableBlockInfo tableBlockInfo : blockInfoList) {
+          tableBlockInfos.add(tableBlockInfo);
+        }
+        LOGGER.info("No.Of Blocks after Blocklet distribution: " + tableBlockInfos.size());
+        return tableBlockInfos;
+      }
+    }
+    LOGGER.info("No.Of Blocks after Blocklet distribution: " + tableBlockInfos.size());
+    return tableBlockInfos;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe1b0f07/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 23e03e4..6c0438f 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.Dictionary
-import org.apache.carbondata.core.carbon.datastore.block.{Distributable, TableBlockInfo}
+import org.apache.carbondata.core.carbon.datastore.block.{BlockletInfos, TableBlockInfo}
 import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsRecorder}
 import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
 import org.apache.carbondata.scan.executor.QueryExecutorFactory
@@ -103,12 +103,15 @@ class CarbonScanRDD[V: ClassTag](
     if (!splits.isEmpty) {
       val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
 
-      val blockList = carbonInputSplits.map(inputSplit =>
+      val blockListTemp = carbonInputSplits.map(inputSplit =>
         new TableBlockInfo(inputSplit.getPath.toString,
           inputSplit.getStart, inputSplit.getSegmentId,
-          inputSplit.getLocations, inputSplit.getLength
-        ).asInstanceOf[Distributable]
+          inputSplit.getLocations, inputSplit.getLength,
+          new BlockletInfos(inputSplit.getNumberOfBlocklets, 0, inputSplit.getNumberOfBlocklets)
+        )
       )
+      val blockList = CarbonLoaderUtil.
+        distributeBlockLets(blockListTemp.asJava, defaultParallelism).asScala
       if (blockList.nonEmpty) {
         // group blocks to nodes, tasks
         val startTime = System.currentTimeMillis



Mime
View raw message