carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject carbondata git commit: [CARBONDATA-2472] Fixed:Refactor NonTransactional table code for Index file IO performance
Date Fri, 18 May 2018 08:02:10 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master b88c09707 -> 2f79e140d


[CARBONDATA-2472] Fixed:Refactor NonTransactional table code for Index file IO performance

[CARBONDATA-2472] Fixed:Refactor NonTransactional table code for Index file IO performance

Problem: now for non-transactional table validating each index file schema for each query.
This causes IO operation for each query.

Root cause: Reading all index files for each time query cause lot of IO time.

Solution: Read all index file at first time query and validate and from next time, read only
new index file's and validate.

This closes #2299


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2f79e140
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2f79e140
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2f79e140

Branch: refs/heads/master
Commit: 2f79e140da740828b75bb5c90f830859dfa15a13
Parents: b88c097
Author: ajantha-bhat <ajanthabhat@gmail.com>
Authored: Fri May 11 14:28:46 2018 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Fri May 18 13:31:58 2018 +0530

----------------------------------------------------------------------
 .../blockletindex/BlockletDataMapFactory.java   | 52 ++++++++++++++++++++
 .../schema/table/column/ColumnSchema.java       | 40 +++++++++++++--
 .../hadoop/api/CarbonTableInputFormat.java      | 42 ----------------
 .../spark/sql/parser/CarbonSparkSqlParser.scala |  5 +-
 4 files changed, 90 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f79e140/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index e502251..e56c2d0 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -49,12 +49,19 @@ import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.BlockletDataMapUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
@@ -66,6 +73,7 @@ import org.apache.hadoop.fs.RemoteIterator;
 public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     implements BlockletDetailsFetcher, SegmentPropertiesFetcher, CacheableDataMap {
 
+  private static final Log LOG = LogFactory.getLog(BlockletDataMapFactory.class);
   private static final String NAME = "clustered.btree.blocklet";
 
   public static final DataMapSchema DATA_MAP_SCHEMA =
@@ -115,6 +123,12 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
         segmentMap.get(segment.getSegmentNo());
     if (tableBlockIndexUniqueIdentifiers == null) {
+      CarbonTable carbonTable = this.getCarbonTable();
+      if (!carbonTable.getTableInfo().isTransactionalTable()) {
+        // For NonTransactional table, compare the schema of all index files with inferred
schema.
+        // If there is a mismatch throw exception. As all files must be of same schema.
+        validateSchemaForNewTranscationalTableFiles(segment, carbonTable);
+      }
       tableBlockIndexUniqueIdentifiers =
           BlockletDataMapUtil.getTableBlockUniqueIdentifiers(segment);
       segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers);
@@ -122,6 +136,44 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     return tableBlockIndexUniqueIdentifiers;
   }
 
+  private void validateSchemaForNewTranscationalTableFiles(Segment segment, CarbonTable carbonTable)
+      throws IOException {
+    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+    Map<String, String> indexFiles = segment.getCommittedIndexFile();
+    for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
+      Path indexFile = new Path(indexFileEntry.getKey());
+      org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.inferSchemaFromIndexFile(
+          indexFile.toString(), carbonTable.getTableName());
+      TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+          tableInfo, identifier.getDatabaseName(),
+          identifier.getTableName(),
+          identifier.getTablePath());
+      List<ColumnSchema> indexFileColumnList =
+          wrapperTableInfo.getFactTable().getListOfColumns();
+      List<ColumnSchema> tableColumnList =
+          carbonTable.getTableInfo().getFactTable().getListOfColumns();
+      if (!isSameColumnSchemaList(indexFileColumnList, tableColumnList)) {
+        LOG.error("Schema of " + indexFile.getName()
+            + " doesn't match with the table's schema");
+        throw new IOException("All the files doesn't have same schema. "
+            + "Unsupported operation on nonTransactional table. Check logs.");
+      }
+    }
+  }
+
+  private boolean isSameColumnSchemaList(List<ColumnSchema> indexFileColumnList,
+      List<ColumnSchema> tableColumnList) {
+    if (indexFileColumnList.size() != tableColumnList.size()) {
+      LOG.error("Index file's column size is " + indexFileColumnList.size()
+          + " but table's column size is " + tableColumnList.size());
+      return false;
+    }
+    for (int i = 0; i < tableColumnList.size(); i++) {
+      return indexFileColumnList.get(i).equalsWithStrictCheck(tableColumnList.get(i));
+    }
+    return false;
+  }
+
   /**
    * Get the blocklet detail information based on blockletid, blockid and segmentid. This
method is
    * exclusively for BlockletDataMapFactory as detail information is only available in this

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f79e140/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
index 1f05f63..80c6a3a 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
@@ -33,11 +33,15 @@ import org.apache.carbondata.core.metadata.schema.table.Writable;
 import org.apache.carbondata.core.metadata.schema.table.WritableUtil;
 import org.apache.carbondata.core.preagg.TimeSeriesUDF;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 /**
  * Store the information about the column meta data present the table
  */
 public class ColumnSchema implements Serializable, Writable {
 
+  private static final Log LOG = LogFactory.getLog(ColumnSchema.class);
   /**
    * serialization version
    */
@@ -330,6 +334,8 @@ public class ColumnSchema implements Serializable, Writable {
         return false;
       }
     } else if (!columnName.equals(other.columnName)) {
+      LOG.error("column name is " + columnName
+          + " but other column name is " + other.columnName);
       return false;
     }
     if (dataType == null) {
@@ -337,6 +343,8 @@ public class ColumnSchema implements Serializable, Writable {
         return false;
       }
     } else if (!dataType.equals(other.dataType)) {
+      LOG.error("column name is" + columnName + " data type is " + dataType
+          + " but other column data type is " + other.dataType);
       return false;
     }
     return true;
@@ -353,18 +361,40 @@ public class ColumnSchema implements Serializable, Writable {
       return false;
     }
     ColumnSchema other = (ColumnSchema) obj;
-    if (!columnUniqueId.equals(other.columnUniqueId) ||
-        (isDimensionColumn != other.isDimensionColumn) ||
-        (scale != other.scale) ||
-        (precision != other.precision) ||
-        (isSortColumn != other.isSortColumn)) {
+    if (!columnUniqueId.equals(other.columnUniqueId)) {
+      LOG.error("Index file's column " + columnName + " columnUniqueId is " + columnUniqueId
+          + " but table's column columnUniqueId is " + other.columnUniqueId);
+      return false;
+    }
+    if (isDimensionColumn != other.isDimensionColumn) {
+      LOG.error("Index file's column " + columnName + " isDimensionColumn is " + isDimensionColumn
+          + " but table's column isDimensionColumn is " + other.isDimensionColumn);
+      return false;
+    }
+    if (scale != other.scale) {
+      LOG.error("Index file's column " + columnName + " scale is " + scale
+          + " but table's column scale is " + other.scale);
+      return false;
+    }
+    if (precision != other.precision) {
+      LOG.error("Index file's column " + columnName + " precision is " + precision
+          + " but table's column precision is " + other.precision);
+      return false;
+    }
+    if (isSortColumn != other.isSortColumn) {
+      LOG.error("Index file's column " + columnName + " isSortColumn is " + isSortColumn
+          + " but table's column isSortColumn is " + other.isSortColumn);
       return false;
     }
     if (encodingList.size() != other.encodingList.size()) {
+      LOG.error("Index file's column " + columnName + " encoding size is " + encodingList.size()
+          + " but table's column encoding size is " + other.encodingList.size());
       return false;
     }
     for (int i = 0; i < encodingList.size(); i++) {
       if (encodingList.get(i).compareTo(other.encodingList.get(i)) != 0) {
+        LOG.error("Index file's column " + columnName + " encoding is " + encodingList.get(i)
+            + " but table's column encoding is " + other.encodingList.get(i));
         return false;
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f79e140/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 1db3138..4feb044 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -36,14 +36,11 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.converter.SchemaConverter;
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
 import org.apache.carbondata.core.metadata.schema.SchemaReader;
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
 import org.apache.carbondata.core.mutate.UpdateVO;
@@ -153,34 +150,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T>
{
     SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager
         .getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope);
 
-    // For NonTransactional table, compare the schema of all index files with inferred schema.
-    // If there is a mismatch throw exception. As all files must be of same schema.
-    if (!carbonTable.getTableInfo().isTransactionalTable()) {
-      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-      for (Segment segment : segments.getValidSegments()) {
-        Map<String, String> indexFiles = segment.getCommittedIndexFile();
-        for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
-          Path indexFile = new Path(indexFileEntry.getKey());
-          org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.inferSchemaFromIndexFile(
-              indexFile.toString(), carbonTable.getTableName());
-          TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
-              tableInfo, identifier.getDatabaseName(),
-              identifier.getTableName(),
-              identifier.getTablePath());
-          List<ColumnSchema> indexFileColumnList =
-              wrapperTableInfo.getFactTable().getListOfColumns();
-          List<ColumnSchema> tableColumnList =
-              carbonTable.getTableInfo().getFactTable().getListOfColumns();
-          if (!compareColumnSchemaList(indexFileColumnList, tableColumnList)) {
-            LOG.error("Schema of " + indexFile.getName()
-                + " doesn't match with the table's schema");
-            throw new IOException("All the files doesn't have same schema. "
-                + "Unsupported operation on nonTransactional table. Check logs.");
-          }
-        }
-      }
-    }
-
     // to check whether only streaming segments access is enabled or not,
     // if access streaming segment is true then data will be read from streaming segments
     boolean accessStreamingSegments = getAccessStreamingSegments(job.getConfiguration());
@@ -296,17 +265,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T>
{
     return splits;
   }
 
-  private boolean compareColumnSchemaList(List<ColumnSchema> indexFileColumnList,
-      List<ColumnSchema> tableColumnList) {
-    if (indexFileColumnList.size() != tableColumnList.size()) {
-      return false;
-    }
-    for (int i = 0; i < tableColumnList.size(); i++) {
-      return indexFileColumnList.get(i).equalsWithStrictCheck(tableColumnList.get(i));
-    }
-    return false;
-  }
-
   /**
    * Below method will be used to get the filter segments when query is fired on pre Aggregate
    * and main table in case of streaming.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f79e140/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index aacbdd0..b7b28b8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -262,10 +262,11 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
 
     val tableInfo = if (external) {
       // read table info from schema file in the provided table path
+      // external table also must convert table name to lower case
       val identifier = AbsoluteTableIdentifier.from(
         tablePath.get,
-        CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession),
-        tableIdentifier.table)
+        CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession).toLowerCase(),
+        tableIdentifier.table.toLowerCase())
       val table = try {
         val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
         if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {


Mime
View raw message