carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/2] carbondata git commit: [CARBONDATA-2853] Implement min/max index for streaming segment
Date Wed, 05 Sep 2018 06:22:26 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 526e3bfa1 -> 21a72bf2e


http://git-wip-us.apache.org/repos/asf/carbondata/blob/21a72bf2/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 9e83924..89f00c9 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -19,7 +19,9 @@ package org.apache.carbondata.streaming.segment;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.common.CarbonIterator;
@@ -29,21 +31,31 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.locks.CarbonLockFactory;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.locks.LockUsage;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.reader.CarbonIndexFileReader;
 import org.apache.carbondata.core.statusmanager.FileFormat;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.Comparator;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
 import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.format.BlockletIndex;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.streaming.CarbonStreamRecordWriter;
+import org.apache.carbondata.streaming.index.StreamFileIndex;
 
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -221,10 +233,48 @@ public class StreamSegment {
     }
   }
 
+  public static BlockletMinMaxIndex collectMinMaxIndex(SimpleStatsResult[] dimStats,
+      SimpleStatsResult[] mrsStats) {
+    BlockletMinMaxIndex minMaxIndex = new BlockletMinMaxIndex();
+    byte[][] maxIndexes = new byte[dimStats.length + mrsStats.length][];
+    for (int index = 0; index < dimStats.length; index++) {
+      maxIndexes[index] =
+          CarbonUtil.getValueAsBytes(dimStats[index].getDataType(), dimStats[index].getMax());
+    }
+    for (int index = 0; index < mrsStats.length; index++) {
+      maxIndexes[dimStats.length + index] =
+          CarbonUtil.getValueAsBytes(mrsStats[index].getDataType(), mrsStats[index].getMax());
+    }
+    minMaxIndex.setMaxValues(maxIndexes);
+
+    byte[][] minIndexes = new byte[maxIndexes.length][];
+    for (int index = 0; index < dimStats.length; index++) {
+      minIndexes[index] =
+          CarbonUtil.getValueAsBytes(dimStats[index].getDataType(), dimStats[index].getMin());
+    }
+    for (int index = 0; index < mrsStats.length; index++) {
+      minIndexes[dimStats.length + index] =
+          CarbonUtil.getValueAsBytes(mrsStats[index].getDataType(), mrsStats[index].getMin());
+    }
+    minMaxIndex.setMinValues(minIndexes);
+    return minMaxIndex;
+  }
+
+  /**
+   * create a StreamBlockIndex from the SimpleStatsResult array
+   */
+  private static StreamFileIndex createStreamBlockIndex(String fileName,
+      BlockletMinMaxIndex minMaxIndex, DataType[] msrDataTypes, int blockletRowCount) {
+    StreamFileIndex streamFileIndex =
+        new StreamFileIndex(fileName, minMaxIndex, blockletRowCount);
+    streamFileIndex.setMsrDataTypes(msrDataTypes);
+    return streamFileIndex;
+  }
+
   /**
    * invoke CarbonStreamOutputFormat to append batch data to existing carbondata file
    */
-  public static void appendBatchData(CarbonIterator<Object[]> inputIterators,
+  public static StreamFileIndex appendBatchData(CarbonIterator<Object[]> inputIterators,
       TaskAttemptContext job, CarbonLoadModel carbonLoadModel) throws Exception {
     CarbonStreamRecordWriter writer = null;
     try {
@@ -235,11 +285,15 @@ public class StreamSegment {
           writer.getSegmentDir(),
           writer.getFileName(),
           CarbonTablePath.getCarbonStreamIndexFileName());
-
+      int blockletRowCount = 0;
       while (inputIterators.hasNext()) {
         writer.write(null, inputIterators.next());
+        blockletRowCount++;
       }
       inputIterators.close();
+
+      return createStreamBlockIndex(writer.getFileName(), writer.getBatchMinMaxIndex(),
+          writer.getMeasureDataTypes(), blockletRowCount);
     } catch (Throwable ex) {
       if (writer != null) {
         LOGGER.error(ex, "Failed to append batch data to stream segment: " +
@@ -331,6 +385,7 @@ public class StreamSegment {
             } else if (blockIndex.getFile_size() < file.getSize()) {
               FileFactory.truncateFile(filePath, fileType, blockIndex.getFile_size());
             }
+            break;
           }
         }
       } finally {
@@ -357,11 +412,208 @@ public class StreamSegment {
   }
 
   /**
-   * update carbonindex file after a stream batch.
+   * read index file to list BlockIndex
+   *
+   * @param indexPath path of the index file
+   * @param fileType  file type of the index file
+   * @return the list of BlockIndex in the index file
+   * @throws IOException
+   */
+  private static List<BlockIndex> readIndexFile(String indexPath, FileFactory.FileType
fileType)
+      throws IOException {
+    List<BlockIndex> blockIndexList = new ArrayList<>();
+    CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);
+    if (index.exists()) {
+      CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+      try {
+        indexReader.openThriftReader(indexPath);
+        while (indexReader.hasNext()) {
+          blockIndexList.add(indexReader.readBlockIndexInfo());
+        }
+      } finally {
+        indexReader.closeThriftReader();
+      }
+    }
+    return blockIndexList;
+  }
+
+  /**
+   * combine the index of new blocklet and the BlockletMinMaxIndex index of stream file
+   * 1. if file index is null, not require Min/Max index
+   * 2. if file index is not null,
+   * 2.1 if blocklet index is null, use the BlockletMinMaxIndex index of stream
+   * 2.2 if blocklet index is not null, combine these two index
+   */
+  private static void mergeBatchMinMax(StreamFileIndex blockletIndex, BlockletMinMaxIndex
fileIndex)
+      throws IOException {
+    if (fileIndex == null) {
+      // backward compatibility
+      // it will not create a min/max index for the old stream file(without min/max index).
+      blockletIndex.setMinMaxIndex(null);
+      return;
+    }
+
+    DataType[] msrDataTypes = blockletIndex.getMsrDataTypes();
+    SerializableComparator[] comparators = new SerializableComparator[msrDataTypes.length];
+    for (int index = 0; index < comparators.length; index++) {
+      comparators[index] = Comparator.getComparatorByDataTypeForMeasure(msrDataTypes[index]);
+    }
+
+    // min value
+    byte[][] minValues = blockletIndex.getMinMaxIndex().getMinValues();
+    byte[][] mergedMinValues = fileIndex.getMinValues();
+    if (minValues == null || minValues.length == 0) {
+      // use file index
+      blockletIndex.getMinMaxIndex().setMinValues(mergedMinValues);
+    } else if (mergedMinValues != null && mergedMinValues.length != 0) {
+      if (minValues.length != mergedMinValues.length) {
+        throw new IOException("the lengths of the min values should be same.");
+      }
+      int dimCount = minValues.length - msrDataTypes.length;
+      for (int index = 0; index < minValues.length; index++) {
+        if (index < dimCount) {
+          if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(minValues[index], mergedMinValues[index])
+              > 0) {
+            minValues[index] = mergedMinValues[index];
+          }
+        } else {
+          Object object = DataTypeUtil.getMeasureObjectFromDataType(
+              minValues[index], msrDataTypes[index - dimCount]);
+          Object mergedObject = DataTypeUtil.getMeasureObjectFromDataType(
+              mergedMinValues[index], msrDataTypes[index - dimCount]);
+          if (comparators[index - dimCount].compare(object, mergedObject) > 0) {
+            minValues[index] = mergedMinValues[index];
+          }
+        }
+      }
+    }
+
+    // max value
+    byte[][] maxValues = blockletIndex.getMinMaxIndex().getMaxValues();
+    byte[][] mergedMaxValues = fileIndex.getMaxValues();
+    if (maxValues == null || maxValues.length == 0) {
+      blockletIndex.getMinMaxIndex().setMaxValues(mergedMaxValues);
+    } else if (mergedMaxValues != null && mergedMaxValues.length != 0) {
+      if (maxValues.length != mergedMaxValues.length) {
+        throw new IOException("the lengths of the max values should be same.");
+      }
+      int dimCount = maxValues.length - msrDataTypes.length;
+      for (int index = 0; index < maxValues.length; index++) {
+        if (index < dimCount) {
+          if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(maxValues[index], mergedMaxValues[index])
+              < 0) {
+            maxValues[index] = mergedMaxValues[index];
+          }
+        } else {
+          Object object = DataTypeUtil.getMeasureObjectFromDataType(
+              maxValues[index], msrDataTypes[index - dimCount]);
+          Object mergedObject = DataTypeUtil.getMeasureObjectFromDataType(
+              mergedMaxValues[index], msrDataTypes[index - dimCount]);
+          if (comparators[index - dimCount].compare(object, mergedObject) < 0) {
+            maxValues[index] = mergedMaxValues[index];
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * merge blocklet min/max to generate batch min/max
+   */
+  public static BlockletMinMaxIndex mergeBlockletMinMax(BlockletMinMaxIndex to,
+      BlockletMinMaxIndex from, DataType[] msrDataTypes) {
+    if (to == null) {
+      return from;
+    }
+    if (from == null) {
+      return to;
+    }
+
+    SerializableComparator[] comparators = new SerializableComparator[msrDataTypes.length];
+    for (int index = 0; index < comparators.length; index++) {
+      comparators[index] = Comparator.getComparatorByDataTypeForMeasure(msrDataTypes[index]);
+    }
+
+    // min value
+    byte[][] minValues = to.getMinValues();
+    byte[][] mergedMinValues = from.getMinValues();
+    int dimCount1 = minValues.length - msrDataTypes.length;
+    for (int index = 0; index < minValues.length; index++) {
+      if (index < dimCount1) {
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(minValues[index], mergedMinValues[index])
+            > 0) {
+          minValues[index] = mergedMinValues[index];
+        }
+      } else {
+        Object object = DataTypeUtil.getMeasureObjectFromDataType(
+            minValues[index], msrDataTypes[index - dimCount1]);
+        Object mergedObject = DataTypeUtil.getMeasureObjectFromDataType(
+            mergedMinValues[index], msrDataTypes[index - dimCount1]);
+        if (comparators[index - dimCount1].compare(object, mergedObject) > 0) {
+          minValues[index] = mergedMinValues[index];
+        }
+      }
+    }
+
+    // max value
+    byte[][] maxValues = to.getMaxValues();
+    byte[][] mergedMaxValues = from.getMaxValues();
+    int dimCount2 = maxValues.length - msrDataTypes.length;
+    for (int index = 0; index < maxValues.length; index++) {
+      if (index < dimCount2) {
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(maxValues[index], mergedMaxValues[index])
+            < 0) {
+          maxValues[index] = mergedMaxValues[index];
+        }
+      } else {
+        Object object = DataTypeUtil.getMeasureObjectFromDataType(
+            maxValues[index], msrDataTypes[index - dimCount2]);
+        Object mergedObject = DataTypeUtil.getMeasureObjectFromDataType(
+            mergedMaxValues[index], msrDataTypes[index - dimCount2]);
+        if (comparators[index - dimCount2].compare(object, mergedObject) < 0) {
+          maxValues[index] = mergedMaxValues[index];
+        }
+      }
+    }
+    return to;
+  }
+
+  /**
+   * merge new blocklet index and old file index to create new file index
    */
-  public static void updateIndexFile(String segmentDir) throws IOException {
+  private static void updateStreamFileIndex(Map<String, StreamFileIndex> indexMap,
+      String indexPath, FileFactory.FileType fileType) throws IOException {
+    List<BlockIndex> blockIndexList = readIndexFile(indexPath, fileType);
+    for (BlockIndex blockIndex : blockIndexList) {
+      BlockletMinMaxIndex fileIndex = CarbonMetadataUtil
+          .convertExternalMinMaxIndex(blockIndex.getBlock_index().getMin_max_index());
+      StreamFileIndex blockletIndex = indexMap.get(blockIndex.getFile_name());
+      if (blockletIndex == null) {
+        // should index all stream file
+        indexMap.put(blockIndex.getFile_name(),
+            new StreamFileIndex(blockIndex.getFile_name(), fileIndex, blockIndex.getNum_rows()));
+      } else {
+        // merge minMaxIndex into StreamBlockIndex
+        blockletIndex.setRowCount(blockletIndex.getRowCount() + blockIndex.getNum_rows());
+        mergeBatchMinMax(blockletIndex, fileIndex);
+      }
+    }
+  }
+
+  /**
+   * update carbon index file after a stream batch.
+   */
+  public static void updateIndexFile(String segmentDir,
+      StreamFileIndex[] blockIndexes) throws IOException {
     FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
     String filePath = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir);
+    // update min/max index
+    Map<String, StreamFileIndex> indexMap = new HashMap<>();
+    for (StreamFileIndex fileIndex : blockIndexes) {
+      indexMap.put(fileIndex.getFileName(), fileIndex);
+    }
+    updateStreamFileIndex(indexMap, filePath, fileType);
+
     String tempFilePath = filePath + CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
     CarbonIndexFileWriter writer = new CarbonIndexFileWriter();
     try {
@@ -372,10 +624,19 @@ public class StreamSegment {
         blockIndex = new BlockIndex();
         blockIndex.setFile_name(file.getName());
         blockIndex.setFile_size(file.getSize());
-        // TODO need to collect these information
-        blockIndex.setNum_rows(-1);
         blockIndex.setOffset(-1);
-        blockIndex.setBlock_index(new BlockletIndex());
+        // set min/max index
+        BlockletIndex blockletIndex = new BlockletIndex();
+        blockIndex.setBlock_index(blockletIndex);
+        StreamFileIndex streamFileIndex = indexMap.get(blockIndex.getFile_name());
+        if (streamFileIndex != null) {
+          blockletIndex.setMin_max_index(
+              CarbonMetadataUtil.convertMinMaxIndex(streamFileIndex.getMinMaxIndex()));
+          blockIndex.setNum_rows(streamFileIndex.getRowCount());
+        } else {
+          blockIndex.setNum_rows(-1);
+        }
+        // write block index
         writer.writeThrift(blockIndex);
       }
       writer.close();


Mime
View raw message