carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From manishgupt...@apache.org
Subject carbondata git commit: [CARBONDATA-2020] Fix avoid reading of all block information in driver for old stores
Date Wed, 17 Jan 2018 04:36:08 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master e820006bc -> 6094af682


[CARBONDATA-2020] Fix avoid reading of all block information in driver for old stores

Problem:
For old stores prior to 1.2 version there is no blocklet information stored in carbonindex
file. So the new code needs to read all carbondata files footers inside
the driver to get the blocklet information. That makes the first time queries become slower.
As observed count(*) query was taking 2 swconds on old version and after upgrade it takes
very long time.

Solution:
If there is no information blocklet available in carbonindex file then don't read carbondata
files footer in driver side. Instead read carbondata files in executor
to get the blocklet information.

This closes #1789


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6094af68
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6094af68
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6094af68

Branch: refs/heads/master
Commit: 6094af6823be994f4a18d5cd3502bc6e2aef3d22
Parents: e820006
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Wed Jan 10 21:05:48 2018 +0530
Committer: manishgupta88 <tomanishgupta18@gmail.com>
Committed: Wed Jan 17 10:08:21 2018 +0530

----------------------------------------------------------------------
 .../core/datastore/block/TableBlockInfo.java    |  19 ++
 .../indexstore/BlockletDataMapIndexStore.java   |  43 ++++-
 .../core/indexstore/BlockletDetailInfo.java     |  91 +++++++++-
 .../blockletindex/BlockletDataMap.java          | 179 ++++++++++++++++---
 .../indexstore/blockletindex/IndexWrapper.java  |  12 +-
 .../executor/impl/AbstractQueryExecutor.java    |  36 +++-
 .../presto/impl/CarbonLocalInputSplit.java      |   6 +
 7 files changed, 343 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6094af68/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index 69d10ae..c3cc551 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -165,6 +165,25 @@ public class TableBlockInfo implements Distributable, Serializable {
   }
 
   /**
+   * Create copy of TableBlockInfo object
+   */
+  public TableBlockInfo copy() {
+    TableBlockInfo info = new TableBlockInfo();
+    info.filePath = filePath;
+    info.blockOffset = blockOffset;
+    info.blockLength = blockLength;
+    info.segmentId = segmentId;
+    info.blockletId = blockletId;
+    info.locations = locations;
+    info.version = version;
+    info.blockletInfos = blockletInfos;
+    info.blockStorageIdMap = blockStorageIdMap;
+    info.deletedDeltaFilePath = deletedDeltaFilePath;
+    info.detailInfo = detailInfo.copy();
+    return info;
+  }
+
+  /**
    * @return the filePath
    */
   public String getFilePath() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6094af68/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index 5b13ac8..f2beae7 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -21,7 +21,11 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -32,6 +36,7 @@ import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapModel;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.PartitionMapFileStore;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 /**
@@ -93,6 +98,7 @@ public class BlockletDataMapIndexStore
       List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException
{
     List<BlockletDataMap> blockletDataMaps = new ArrayList<>(tableSegmentUniqueIdentifiers.size());
     List<TableBlockIndexUniqueIdentifier> missedIdentifiers = new ArrayList<>();
+    ExecutorService service = null;
     // Get the datamaps for each indexfile from cache.
     try {
       for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) {
@@ -106,6 +112,11 @@ public class BlockletDataMapIndexStore
       if (missedIdentifiers.size() > 0) {
         Map<String, SegmentIndexFileStore> segmentIndexFileStoreMap = new HashMap<>();
         Map<String, PartitionMapFileStore> partitionFileStoreMap = new HashMap<>();
+        service =
+            Executors.newCachedThreadPool(
+                new CarbonThreadFactory("BlockletDataMapIndexStore:" + missedIdentifiers.get(0)
+                    .getAbsoluteTableIdentifier().getTableName()));
+        List<Future<BlockletDataMap>> futureList = new ArrayList<>();
         for (TableBlockIndexUniqueIdentifier identifier: missedIdentifiers) {
           SegmentIndexFileStore indexFileStore =
               segmentIndexFileStoreMap.get(identifier.getSegmentId());
@@ -124,7 +135,12 @@ public class BlockletDataMapIndexStore
             partitionFileStore.readAllPartitionsOfSegment(segmentPath);
             partitionFileStoreMap.put(identifier.getSegmentId(), partitionFileStore);
           }
-          blockletDataMaps.add(loadAndGetDataMap(identifier, indexFileStore, partitionFileStore));
+          BlockletDataMapLoader blockletDataMapLoader =
+              new BlockletDataMapLoader(identifier, indexFileStore, partitionFileStore);
+          futureList.add(service.submit(blockletDataMapLoader));
+        }
+        for (Future<BlockletDataMap> dataMapFuture : futureList) {
+          blockletDataMaps.add(dataMapFuture.get());
         }
       }
     } catch (Throwable e) {
@@ -132,6 +148,10 @@ public class BlockletDataMapIndexStore
         dataMap.clear();
       }
       throw new IOException("Problem in loading segment blocks.", e);
+    } finally {
+      if (service != null) {
+        service.shutdownNow();
+      }
     }
     return blockletDataMaps;
   }
@@ -160,6 +180,27 @@ public class BlockletDataMapIndexStore
   }
 
   /**
+   * This class is used to parallelize reading of index files.
+   */
+  private class BlockletDataMapLoader implements Callable<BlockletDataMap> {
+
+    private TableBlockIndexUniqueIdentifier identifier;
+    private SegmentIndexFileStore indexFileStore;
+    private PartitionMapFileStore partitionFileStore;
+
+    public BlockletDataMapLoader(TableBlockIndexUniqueIdentifier identifier,
+        SegmentIndexFileStore indexFileStore, PartitionMapFileStore partitionFileStore) {
+      this.identifier = identifier;
+      this.indexFileStore = indexFileStore;
+      this.partitionFileStore = partitionFileStore;
+    }
+
+    @Override public BlockletDataMap call() throws Exception {
+      return loadAndGetDataMap(identifier, indexFileStore, partitionFileStore);
+    }
+  }
+
+  /**
    * Below method will be used to load the segment of segments
    * One segment may have multiple task , so  table segment will be loaded
    * based on task id and will return the map of taksId to table segment

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6094af68/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
index 658a1e9..5f4224c 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
@@ -16,14 +16,20 @@
  */
 package org.apache.carbondata.core.indexstore;
 
+import java.io.ByteArrayInputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 
 import org.apache.hadoop.io.Writable;
+import org.xerial.snappy.Snappy;
 
 /**
  * Blocklet detail information to be sent to each executor
@@ -44,6 +50,12 @@ public class BlockletDetailInfo implements Serializable, Writable {
 
   private BlockletInfo blockletInfo;
 
+  private long blockFooterOffset;
+
+  private List<ColumnSchema> columnSchemas;
+
+  private byte[] columnSchemaBinary;
+
   public int getRowCount() {
     return rowCount;
   }
@@ -102,7 +114,13 @@ public class BlockletDetailInfo implements Serializable, Writable {
       out.writeInt(dimLens[i]);
     }
     out.writeLong(schemaUpdatedTimeStamp);
-    blockletInfo.write(out);
+    out.writeBoolean(blockletInfo != null);
+    if (blockletInfo != null) {
+      blockletInfo.write(out);
+    }
+    out.writeLong(blockFooterOffset);
+    out.writeInt(columnSchemaBinary.length);
+    out.write(columnSchemaBinary);
   }
 
   @Override public void readFields(DataInput in) throws IOException {
@@ -115,8 +133,51 @@ public class BlockletDetailInfo implements Serializable, Writable {
       dimLens[i] = in.readInt();
     }
     schemaUpdatedTimeStamp = in.readLong();
-    blockletInfo = new BlockletInfo();
-    blockletInfo.readFields(in);
+    if (in.readBoolean()) {
+      blockletInfo = new BlockletInfo();
+      blockletInfo.readFields(in);
+    }
+    blockFooterOffset = in.readLong();
+    int bytesSize = in.readInt();
+    byte[] schemaArray = new byte[bytesSize];
+    in.readFully(schemaArray);
+    readColumnSchema(schemaArray);
+  }
+
+  /**
+   * Read column schema from binary
+   * @param schemaArray
+   * @throws IOException
+   */
+  public void readColumnSchema(byte[] schemaArray) throws IOException {
+    // uncompress it.
+    schemaArray = Snappy.uncompress(schemaArray);
+    ByteArrayInputStream schemaStream = new ByteArrayInputStream(schemaArray);
+    DataInput schemaInput = new DataInputStream(schemaStream);
+    columnSchemas = new ArrayList<>();
+    int size = schemaInput.readShort();
+    for (int i = 0; i < size; i++) {
+      ColumnSchema columnSchema = new ColumnSchema();
+      columnSchema.readFields(schemaInput);
+      columnSchemas.add(columnSchema);
+    }
+  }
+
+  /**
+   * Create copy of BlockletDetailInfo
+   */
+  public BlockletDetailInfo copy() {
+    BlockletDetailInfo detailInfo = new BlockletDetailInfo();
+    detailInfo.rowCount = rowCount;
+    detailInfo.pagesCount = pagesCount;
+    detailInfo.versionNumber = versionNumber;
+    detailInfo.blockletId = blockletId;
+    detailInfo.dimLens = dimLens;
+    detailInfo.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp;
+    detailInfo.blockletInfo = blockletInfo;
+    detailInfo.blockFooterOffset = blockFooterOffset;
+    detailInfo.columnSchemas = columnSchemas;
+    return detailInfo;
   }
 
   public Short getBlockletId() {
@@ -126,4 +187,28 @@ public class BlockletDetailInfo implements Serializable, Writable {
   public void setBlockletId(Short blockletId) {
     this.blockletId = blockletId;
   }
+
+  public long getBlockFooterOffset() {
+    return blockFooterOffset;
+  }
+
+  public void setBlockFooterOffset(long blockFooterOffset) {
+    this.blockFooterOffset = blockFooterOffset;
+  }
+
+  public List<ColumnSchema> getColumnSchemas() {
+    return columnSchemas;
+  }
+
+  public void setColumnSchemas(List<ColumnSchema> columnSchemas) {
+    this.columnSchemas = columnSchemas;
+  }
+
+  public void setColumnSchemaBinary(byte[] columnSchemaBinary) {
+    this.columnSchemaBinary = columnSchemaBinary;
+  }
+
+  public byte[] getColumnSchemaBinary() {
+    return columnSchemaBinary;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6094af68/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 43953ce..7c620d7 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -50,6 +50,7 @@ import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -61,10 +62,11 @@ import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataFileFooterConverter;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
+import org.xerial.snappy.Snappy;
+
 /**
  * Datamap implementation for blocklet.
  */
@@ -93,10 +95,16 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   private static int BLOCK_INFO_INDEX = 8;
 
+  private static int BLOCK_FOOTER_OFFSET = 9;
+
   private static int TASK_MIN_VALUES_INDEX = 0;
 
   private static int TASK_MAX_VALUES_INDEX = 1;
 
+  private static int SCHEMA = 2;
+
+  private static int PARTITION_INFO = 3;
+
   private UnsafeMemoryDMStore unsafeMemoryDMStore;
 
   private UnsafeMemoryDMStore unsafeMemorySummaryDMStore;
@@ -117,28 +125,36 @@ public class BlockletDataMap implements DataMap, Cacheable {
         .getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData());
     isPartitionedSegment = blockletDataMapInfo.isPartitionedSegment();
     DataMapRowImpl summaryRow = null;
+    byte[] schemaBinary = null;
     for (DataFileFooter fileFooter : indexInfo) {
-      List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
       if (segmentProperties == null) {
+        List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
+        schemaBinary = convertSchemaToBinary(columnInTable);
         columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
         segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
         createSchema(segmentProperties);
-        createSummarySchema(segmentProperties, blockletDataMapInfo.getPartitions());
+        createSummarySchema(segmentProperties, blockletDataMapInfo.getPartitions(), schemaBinary);
       }
       TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
-      if (fileFooter.getBlockletList() == null || fileFooter.getBlockletList().size() ==
0) {
-        LOGGER
-            .info("Reading carbondata file footer to get blocklet info " + blockInfo.getFilePath());
-        fileFooter = CarbonUtil.readMetadatFile(blockInfo);
+      if (fileFooter.getBlockletList() == null) {
+        // This is old store scenario, here blocklet information is not available in  index
file so
+        // load only block info
+        summaryRow =
+            loadToUnsafeBlock(fileFooter, segmentProperties, blockInfo.getFilePath(), summaryRow);
+      } else {
+        // Here it loads info about all blocklets of index
+        summaryRow =
+            loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath(), summaryRow);
       }
-
-      summaryRow = loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath(), summaryRow);
     }
     if (unsafeMemoryDMStore != null) {
       unsafeMemoryDMStore.finishWriting();
     }
     if (null != unsafeMemorySummaryDMStore) {
-      addTaskSummaryRowToUnsafeMemoryStore(summaryRow, blockletDataMapInfo.getPartitions());
+      addTaskSummaryRowToUnsafeMemoryStore(
+          summaryRow,
+          blockletDataMapInfo.getPartitions(),
+          schemaBinary);
       unsafeMemorySummaryDMStore.finishWriting();
     }
     LOGGER.info(
@@ -203,7 +219,9 @@ public class BlockletDataMap implements DataMap, Cacheable {
         DataOutput dataOutput = new DataOutputStream(stream);
         blockletInfo.write(dataOutput);
         serializedData = stream.toByteArray();
-        row.setByteArray(serializedData, ordinal);
+        row.setByteArray(serializedData, ordinal++);
+        // Add block footer offset, it is used if we need to read footer of block
+        row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal);
         unsafeMemoryDMStore.addIndexRowToUnsafe(row);
       } catch (Exception e) {
         throw new RuntimeException(e);
@@ -213,21 +231,91 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return summaryRow;
   }
 
+  /**
+   * Load information for the block.It is the case can happen only for old stores
+   * where blocklet information is not available in index file. So load only block information
+   * and read blocklet information in executor.
+   */
+  private DataMapRowImpl loadToUnsafeBlock(DataFileFooter fileFooter,
+      SegmentProperties segmentProperties, String filePath, DataMapRowImpl summaryRow) {
+    int[] minMaxLen = segmentProperties.getColumnsValueSize();
+    BlockletIndex blockletIndex = fileFooter.getBlockletIndex();
+    CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
+    // Add one row to maintain task level min max for segment pruning
+    if (summaryRow == null) {
+      summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema());
+    }
+    DataMapRow row = new DataMapRowImpl(schema);
+    int ordinal = 0;
+    int taskMinMaxOrdinal = 0;
+    // add start key as index key
+    row.setByteArray(blockletIndex.getBtreeIndex().getStartKey(), ordinal++);
+
+    BlockletMinMaxIndex minMaxIndex = blockletIndex.getMinMaxIndex();
+    byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), minMaxLen);
+    row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal);
+    // compute and set task level min values
+    addTaskMinMaxValues(summaryRow, minMaxLen,
+        unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], minValues,
+        TASK_MIN_VALUES_INDEX, true);
+    ordinal++;
+    taskMinMaxOrdinal++;
+    byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen);
+    row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal);
+    // compute and set task level max values
+    addTaskMinMaxValues(summaryRow, minMaxLen,
+        unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
+        TASK_MAX_VALUES_INDEX, false);
+    ordinal++;
+
+    row.setInt((int)fileFooter.getNumberOfRows(), ordinal++);
+
+    // add file path
+    byte[] filePathBytes = filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+    row.setByteArray(filePathBytes, ordinal++);
+
+    // add pages
+    row.setShort((short) 0, ordinal++);
+
+    // add version number
+    row.setShort(fileFooter.getVersionId().number(), ordinal++);
+
+    // add schema updated time
+    row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++);
+
+    // add blocklet info
+    row.setByteArray(new byte[0], ordinal++);
+
+    row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal);
+    try {
+      unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return summaryRow;
+  }
+
   private void addTaskSummaryRowToUnsafeMemoryStore(DataMapRow summaryRow,
-      List<String> partitions) {
+      List<String> partitions, byte[] schemaBinary) throws IOException {
     // write the task summary info to unsafe memory store
     if (null != summaryRow) {
+      // Add column schema , it is useful to generate segment properties in executor.
+      // So we no need to read footer again there.
+      if (schemaBinary != null) {
+        summaryRow.setByteArray(schemaBinary, SCHEMA);
+      }
       if (partitions != null && partitions.size() > 0) {
         CarbonRowSchema[] minSchemas =
-            ((CarbonRowSchema.StructCarbonRowSchema) unsafeMemorySummaryDMStore.getSchema()[2])
-                .getChildSchemas();
+            ((CarbonRowSchema.StructCarbonRowSchema) unsafeMemorySummaryDMStore
+                .getSchema()[PARTITION_INFO]).getChildSchemas();
         DataMapRow partitionRow = new DataMapRowImpl(minSchemas);
         for (int i = 0; i < partitions.size(); i++) {
           partitionRow
               .setByteArray(partitions.get(i).getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS),
                   i);
         }
-        summaryRow.setRow(partitionRow, 2);
+        summaryRow.setRow(partitionRow, PARTITION_INFO);
       }
       try {
         unsafeMemorySummaryDMStore.addIndexRowToUnsafe(summaryRow);
@@ -393,6 +481,9 @@ public class BlockletDataMap implements DataMap, Cacheable {
     //for blocklet info
     indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
 
+    // for block footer offset.
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
+
     unsafeMemoryDMStore =
         new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]));
   }
@@ -405,10 +496,14 @@ public class BlockletDataMap implements DataMap, Cacheable {
    * @param partitions
    * @throws MemoryException
    */
-  private void createSummarySchema(SegmentProperties segmentProperties, List<String>
partitions)
+  private void createSummarySchema(SegmentProperties segmentProperties, List<String>
partitions,
+      byte[] schemaBinary)
       throws MemoryException {
-    List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>(2);
+    List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>();
     getMinMaxSchema(segmentProperties, taskMinMaxSchemas);
+    // for storing column schema
+    taskMinMaxSchemas.add(
+        new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, schemaBinary.length));
     if (partitions != null && partitions.size() > 0) {
       CarbonRowSchema[] mapSchemas = new CarbonRowSchema[partitions.size()];
       for (int i = 0; i < mapSchemas.length; i++) {
@@ -604,18 +699,23 @@ public class BlockletDataMap implements DataMap, Cacheable {
     detailInfo.setBlockletId((short) blockletId);
     detailInfo.setDimLens(columnCardinality);
     detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX));
-    BlockletInfo blockletInfo = new BlockletInfo();
-    try {
-      byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX);
-      ByteArrayInputStream stream = new ByteArrayInputStream(byteArray);
-      DataInputStream inputStream = new DataInputStream(stream);
-      blockletInfo.readFields(inputStream);
-      inputStream.close();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+    byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX);
+    BlockletInfo blockletInfo = null;
+    if (byteArray.length > 0) {
+      try {
+        blockletInfo = new BlockletInfo();
+        ByteArrayInputStream stream = new ByteArrayInputStream(byteArray);
+        DataInputStream inputStream = new DataInputStream(stream);
+        blockletInfo.readFields(inputStream);
+        inputStream.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
     }
     detailInfo.setBlockletInfo(blockletInfo);
     blocklet.setDetailInfo(detailInfo);
+    detailInfo.setBlockFooterOffset(row.getLong(BLOCK_FOOTER_OFFSET));
+    detailInfo.setColumnSchemaBinary(getColumnSchemaBinary());
     return blocklet;
   }
 
@@ -728,9 +828,9 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   private List<String> getPartitions() {
     DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
-    if (unsafeRow.getColumnCount() > 2) {
+    if (unsafeRow.getColumnCount() > PARTITION_INFO) {
       List<String> partitions = new ArrayList<>();
-      DataMapRow row = unsafeRow.getRow(2);
+      DataMapRow row = unsafeRow.getRow(PARTITION_INFO);
       for (int i = 0; i < row.getColumnCount(); i++) {
         partitions.add(
             new String(row.getByteArray(i), CarbonCommonConstants.DEFAULT_CHARSET_CLASS));
@@ -740,6 +840,29 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return null;
   }
 
+  private byte[] getColumnSchemaBinary() {
+    DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
+    return unsafeRow.getByteArray(SCHEMA);
+  }
+
+  /**
+   * Convert schema to binary
+   */
+  private byte[] convertSchemaToBinary(List<ColumnSchema> columnSchemas) throws IOException
{
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    DataOutput dataOutput = new DataOutputStream(stream);
+    dataOutput.writeShort(columnSchemas.size());
+    for (ColumnSchema columnSchema : columnSchemas) {
+      if (columnSchema.getColumnReferenceId() == null) {
+        columnSchema.setColumnReferenceId(columnSchema.getColumnUniqueId());
+      }
+      columnSchema.write(dataOutput);
+    }
+    byte[] byteArray = stream.toByteArray();
+    // Compress with snappy to reduce the size of schema
+    return Snappy.rawCompress(byteArray, byteArray.length);
+  }
+
   @Override
   public void clear() {
     if (unsafeMemoryDMStore != null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6094af68/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
index b8cffc6..17ad17f 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
@@ -16,14 +16,12 @@
  */
 package org.apache.carbondata.core.indexstore.blockletindex;
 
-import java.io.IOException;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
  * Wrapper of abstract index
@@ -32,14 +30,8 @@ import org.apache.carbondata.core.util.CarbonUtil;
 public class IndexWrapper extends AbstractIndex {
 
   public IndexWrapper(List<TableBlockInfo> blockInfos) {
-    DataFileFooter fileFooter = null;
-    try {
-      fileFooter = CarbonUtil.readMetadatFile(blockInfos.get(0));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    segmentProperties = new SegmentProperties(fileFooter.getColumnInTable(),
-        fileFooter.getSegmentInfo().getColumnCardinality());
+    segmentProperties = new SegmentProperties(blockInfos.get(0).getDetailInfo().getColumnSchemas(),
+        blockInfos.get(0).getDetailInfo().getDimLens());
     dataRefNode = new BlockletDataRefNodeWrapper(blockInfos, 0,
         segmentProperties.getDimensionColumnsValueSize());
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6094af68/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 03bc50f..694f8ae 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -42,12 +42,15 @@ import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNodeWrapper;
 import org.apache.carbondata.core.indexstore.blockletindex.IndexWrapper;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -135,7 +138,14 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E>
{
           tableBlockInfos = new ArrayList<>();
           listMap.put(blockInfo.getFilePath(), tableBlockInfos);
         }
-        tableBlockInfos.add(blockInfo);
+        BlockletDetailInfo blockletDetailInfo = blockInfo.getDetailInfo();
+        // This is the case of old stores where blocklet information is not available so
read
+        // the blocklet information from block file
+        if (blockletDetailInfo.getBlockletInfo() == null) {
+          readAndFillBlockletInfo(blockInfo, tableBlockInfos, blockletDetailInfo);
+        } else {
+          tableBlockInfos.add(blockInfo);
+        }
       }
       for (List<TableBlockInfo> tableBlockInfos: listMap.values()) {
         indexList.add(new IndexWrapper(tableBlockInfos));
@@ -199,6 +209,30 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E>
{
     queryModel.setColumnToDictionaryMapping(queryProperties.columnToDictionayMapping);
   }
 
+  /**
+   * Read the file footer of block file and get the blocklets to query
+   */
+  private void readAndFillBlockletInfo(TableBlockInfo blockInfo,
+      List<TableBlockInfo> tableBlockInfos, BlockletDetailInfo blockletDetailInfo)
+      throws IOException {
+    blockInfo.setBlockOffset(blockletDetailInfo.getBlockFooterOffset());
+    blockInfo.setDetailInfo(null);
+    DataFileFooter fileFooter = CarbonUtil.readMetadatFile(blockInfo);
+    blockInfo.setDetailInfo(blockletDetailInfo);
+    List<BlockletInfo> blockletList = fileFooter.getBlockletList();
+    short count = 0;
+    for (BlockletInfo blockletInfo: blockletList) {
+      TableBlockInfo info = blockInfo.copy();
+      BlockletDetailInfo detailInfo = info.getDetailInfo();
+      detailInfo.setRowCount(blockletInfo.getNumberOfRows());
+      detailInfo.setBlockletInfo(blockletInfo);
+      detailInfo.setPagesCount((short) blockletInfo.getNumberOfPages());
+      detailInfo.setBlockletId(count);
+      tableBlockInfos.add(info);
+      count++;
+    }
+  }
+
   private List<TableBlockUniqueIdentifier> prepareTableBlockUniqueIdentifier(
       List<TableBlockInfo> tableBlockInfos, AbsoluteTableIdentifier absoluteTableIdentifier)
{
     List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6094af68/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
index 1873b8c..3c42d0a 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.gson.Gson;
 import org.apache.hadoop.fs.Path;
 
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
@@ -123,6 +124,11 @@ public class CarbonLocalInputSplit {
         carbonLocalInputSplit.getDeleteDeltaFiles());
     Gson gson = new Gson();
     BlockletDetailInfo blockletDetailInfo = gson.fromJson(carbonLocalInputSplit.detailInfo,
BlockletDetailInfo.class);
+    try {
+      blockletDetailInfo.readColumnSchema(blockletDetailInfo.getColumnSchemaBinary());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
     inputSplit.setDetailInfo(blockletDetailInfo);
     return inputSplit;
   }


Mime
View raw message