Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 16DFB200B5A for ; Wed, 20 Jul 2016 12:13:52 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1550D160A64; Wed, 20 Jul 2016 10:13:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C21CC160A86 for ; Wed, 20 Jul 2016 12:13:49 +0200 (CEST) Received: (qmail 9372 invoked by uid 500); 20 Jul 2016 10:13:49 -0000 Mailing-List: contact commits-help@carbondata.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.incubator.apache.org Delivered-To: mailing list commits@carbondata.incubator.apache.org Received: (qmail 9361 invoked by uid 99); 20 Jul 2016 10:13:48 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Jul 2016 10:13:48 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 712B01A062B for ; Wed, 20 Jul 2016 10:13:48 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.636 X-Spam-Level: X-Spam-Status: No, score=-4.636 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426, T_FILL_THIS_FORM_SHORT=0.01] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id mqTXJRI6u_m0 for ; Wed, 20 Jul 2016 10:13:35 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id 9C97B60E53 for ; Wed, 20 Jul 2016 10:13:32 +0000 (UTC) Received: (qmail 7784 invoked by uid 99); 20 Jul 2016 10:13:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Jul 2016 10:13:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 552E2E0C0A; Wed, 20 Jul 2016 10:13:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chenliang613@apache.org To: commits@carbondata.incubator.apache.org Date: Wed, 20 Jul 2016 10:14:10 -0000 Message-Id: <908f208e6cef4e54b3aebf37c968407d@git.apache.org> In-Reply-To: <5e0daf87cb5643eba1c1b8e77c61f141@git.apache.org> References: <5e0daf87cb5643eba1c1b8e77c61f141@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [42/50] [abbrv] incubator-carbondata git commit: Merge remote-tracking branch 'carbon_master/master' into apache/master archived-at: Wed, 20 Jul 2016 10:13:52 -0000 http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/core/src/main/java/org/carbondata/scan/result/AbstractScannedResult.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/carbondata/scan/result/AbstractScannedResult.java index 5d00e32,0000000..9602c41 mode 100644,000000..100644 --- a/core/src/main/java/org/carbondata/scan/result/AbstractScannedResult.java +++ b/core/src/main/java/org/carbondata/scan/result/AbstractScannedResult.java @@@ -1,347 -1,0 +1,382 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.carbondata.scan.result; + ++import java.io.ByteArrayOutputStream; ++import java.io.DataOutputStream; ++import java.io.IOException; +import java.math.BigDecimal; +import java.util.Map; + ++import org.carbondata.common.logging.LogService; ++import org.carbondata.common.logging.LogServiceFactory; +import org.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk; +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.scan.executor.infos.BlockExecutionInfo; +import org.carbondata.scan.executor.infos.KeyStructureInfo; + +/** + * Scanned result class which will store and provide the result on request + */ +public abstract class AbstractScannedResult { + ++ private static final LogService LOGGER = ++ LogServiceFactory.getLogService(AbstractScannedResult.class.getName()); + /** + * current row number + */ + protected int currentRow = -1; + /** + * row mapping indexes + */ + protected int[] rowMapping; + /** + * key size of the fixed length column + */ + private int fixedLengthKeySize; + /** + * total number of rows + */ + private int totalNumberOfRows; + /** + * to keep track of number of rows process + */ + private int rowCounter; + /** + * dimension column data chunk + */ + private DimensionColumnDataChunk[] dataChunks; + /** + * measure column data chunk + */ + private MeasureColumnDataChunk[] measureDataChunks; + /** + * dictionary column block index in file + */ + private int[] dictionaryColumnBlockIndexes; + + /** + * no dictionary column block index in file + */ + private int[] noDictionaryColumnBlockIndexes; + + /** + * column group to is key structure info + * which will be used to get the key from the complete + * column group key + * For example if only one dimension of the column group is selected + * then from complete column group key it will be used to mask the key and + * get the particular column key + */ + private Map columnGroupKeyStructureInfo; + ++ /** ++ * ++ */ ++ private Map complexParentIndexToQueryMap; ++ ++ /** ++ * parent block indexes ++ */ ++ private int[] complexParentBlockIndexes; ++ + public AbstractScannedResult(BlockExecutionInfo blockExecutionInfo) { + this.fixedLengthKeySize = blockExecutionInfo.getFixedLengthKeySize(); + this.noDictionaryColumnBlockIndexes = blockExecutionInfo.getNoDictionaryBlockIndexes(); + this.dictionaryColumnBlockIndexes = blockExecutionInfo.getDictionaryColumnBlockIndex(); + this.columnGroupKeyStructureInfo = blockExecutionInfo.getColumnGroupToKeyStructureInfo(); ++ this.complexParentIndexToQueryMap = blockExecutionInfo.getComlexDimensionInfoMap(); ++ this.complexParentBlockIndexes = blockExecutionInfo.getComplexColumnParentBlockIndexes(); + } + + /** + * Below method will be used to set the dimension chunks + * which will be used to create a row + * + * @param dataChunks dimension chunks used in query + */ + public void setDimensionChunks(DimensionColumnDataChunk[] dataChunks) { + this.dataChunks = dataChunks; + } + + /** + * Below method will be used to set the measure column chunks + * + * @param measureDataChunks measure data chunks + */ + public void setMeasureChunks(MeasureColumnDataChunk[] measureDataChunks) { + this.measureDataChunks = measureDataChunks; + } + + /** + * Below method will be used to get the chunk based in measure ordinal + * + * @param ordinal measure ordinal + * @return measure column chunk + */ + public MeasureColumnDataChunk getMeasureChunk(int ordinal) { + return measureDataChunks[ordinal]; + } + + /** + * Below method will be used to get the key for all the dictionary dimensions + * which is present in the query + * + * @param rowId row id selected after scanning + * @return return the dictionary key + */ + protected byte[] getDictionaryKeyArray(int rowId) { + byte[] completeKey = new byte[fixedLengthKeySize]; + int offset = 0; + for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) { + offset += dataChunks[dictionaryColumnBlockIndexes[i]] + .fillChunkData(completeKey, offset, rowId, + columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i])); + } + rowCounter++; + return completeKey; + } + + /** + * Just increment the counter incase of query only on measures. + */ + public void incrementCounter() { + rowCounter ++; + currentRow ++; + } + + /** + * Below method will be used to get the dimension data based on dimension + * ordinal and index + * + * @param dimOrdinal dimension ordinal present in the query + * @param rowId row index + * @return dimension data based on row id + */ + protected byte[] getDimensionData(int dimOrdinal, int rowId) { + return dataChunks[dimOrdinal].getChunkData(rowId); + } + + /** + * Below method will be used to get the dimension key array + * for all the no dictionary dimension present in the query + * + * @param rowId row number + * @return no dictionary keys for all no dictionary dimension + */ + protected byte[][] getNoDictionaryKeyArray(int rowId) { + byte[][] noDictionaryColumnsKeys = new byte[noDictionaryColumnBlockIndexes.length][]; + int position = 0; + for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) { + noDictionaryColumnsKeys[position++] = + dataChunks[noDictionaryColumnBlockIndexes[i]].getChunkData(rowId); + } + return noDictionaryColumnsKeys; + } + + /** + * Below method will be used to get the complex type keys array based + * on row id for all the complex type dimension selected in query + * + * @param rowId row number + * @return complex type key array for all the complex dimension selected in query + */ + protected byte[][] getComplexTypeKeyArray(int rowId) { - return new byte[0][]; ++ byte[][] complexTypeData = new byte[complexParentBlockIndexes.length][]; ++ for (int i = 0; i < complexTypeData.length; i++) { ++ GenericQueryType genericQueryType = ++ complexParentIndexToQueryMap.get(complexParentBlockIndexes[i]); ++ ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); ++ DataOutputStream dataOutput = new DataOutputStream(byteStream); ++ try { ++ genericQueryType.parseBlocksAndReturnComplexColumnByteArray(dataChunks, rowId, dataOutput); ++ complexTypeData[i] = byteStream.toByteArray(); ++ } catch (IOException e) { ++ LOGGER.error(e); ++ } finally { ++ CarbonUtil.closeStreams(dataOutput); ++ CarbonUtil.closeStreams(byteStream); ++ } ++ } ++ return complexTypeData; + } + + /** + * @return return the total number of row after scanning + */ + public int numberOfOutputRows() { + return this.totalNumberOfRows; + } + + /** + * to check whether any more row is present in the result + * + * @return + */ + public boolean hasNext() { + return rowCounter < this.totalNumberOfRows; + } + + /** + * As this class will be a flyweight object so + * for one block all the blocklet scanning will use same result object + * in that case we need to reset the counter to zero so + * for new result it will give the result from zero + */ + public void reset() { + rowCounter = 0; + currentRow = -1; + } + + /** + * @param totalNumberOfRows set total of number rows valid after scanning + */ + public void setNumberOfRows(int totalNumberOfRows) { + this.totalNumberOfRows = totalNumberOfRows; + } + + /** + * After applying filter it will return the bit set with the valid row indexes + * so below method will be used to set the row indexes + * + * @param indexes + */ + public void setIndexes(int[] indexes) { + this.rowMapping = indexes; + } + + /** + * Below method will be used to check whether measure value is null or not + * + * @param ordinal measure ordinal + * @param rowIndex row number to be checked + * @return whether it is null or not + */ + protected boolean isNullMeasureValue(int ordinal, int rowIndex) { + return measureDataChunks[ordinal].getNullValueIndexHolder().getBitSet().get(rowIndex); + } + + /** + * Below method will be used to get the measure value of + * long type + * + * @param ordinal measure ordinal + * @param rowIndex row number of the measure value + * @return measure value of long type + */ + protected long getLongMeasureValue(int ordinal, int rowIndex) { + return measureDataChunks[ordinal].getMeasureDataHolder().getReadableLongValueByIndex(rowIndex); + } + + /** + * Below method will be used to get the measure value of double type + * + * @param ordinal measure ordinal + * @param rowIndex row number + * @return measure value of double type + */ + protected double getDoubleMeasureValue(int ordinal, int rowIndex) { + return measureDataChunks[ordinal].getMeasureDataHolder() + .getReadableDoubleValueByIndex(rowIndex); + } + + /** + * Below method will be used to get the measure type of big decimal data type + * + * @param ordinal ordinal of the of the measure + * @param rowIndex row number + * @return measure of big decimal type + */ + protected BigDecimal getBigDecimalMeasureValue(int ordinal, int rowIndex) { + return measureDataChunks[ordinal].getMeasureDataHolder() + .getReadableBigDecimalValueByIndex(rowIndex); + } + + /** + * will return the current valid row id + * + * @return valid row id + */ + public abstract int getCurrenrRowId(); + + /** + * @return dictionary key array for all the dictionary dimension + * selected in query + */ + public abstract byte[] getDictionaryKeyArray(); + + /** + * Return the dimension data based on dimension ordinal + * + * @param dimensionOrdinal dimension ordinal + * @return dimension data + */ + public abstract byte[] getDimensionKey(int dimensionOrdinal); + + /** + * Below method will be used to get the complex type key array + * + * @return complex type key array + */ + public abstract byte[][] getComplexTypeKeyArray(); + + /** + * Below method will be used to get the no dictionary key + * array for all the no dictionary dimension selected in query + * + * @return no dictionary key array for all the no dictionary dimension + */ + public abstract byte[][] getNoDictionaryKeyArray(); + + /** + * Below method will be used to to check whether measure value + * is null or for a measure + * + * @param ordinal measure ordinal + * @return is null or not + */ + public abstract boolean isNullMeasureValue(int ordinal); + + /** + * Below method will be used to get the measure value for measure + * of long data type + * + * @param ordinal measure ordinal + * @return long value of measure + */ + public abstract long getLongMeasureValue(int ordinal); + + /** + * Below method will be used to get the value of measure of double + * type + * + * @param ordinal measure ordinal + * @return measure value + */ + public abstract double getDoubleMeasureValue(int ordinal); + + /** + * Below method will be used to get the data of big decimal type + * of a measure + * + * @param ordinal measure ordinal + * @return measure value + */ + public abstract BigDecimal getBigDecimalMeasureValue(int ordinal); +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/core/src/test/java/org/carbondata/core/path/CarbonFormatDirectoryStructureTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/hadoop/src/main/java/org/carbondata/hadoop/CarbonInputFormat.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CarbonCompactionExecutor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --cc integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java index ae1cf73,3d17141..5ff8465 --- a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java +++ b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java @@@ -802,9 -785,8 +789,8 @@@ public final class CarbonLoaderUtil return status; } - public static void writeLoadMetadata(CarbonDataLoadSchema schema, - String databaseName, String tableName, - List listOfLoadFolderDetails) throws IOException { - public static void writeLoadMetadata(CarbonDataLoadSchema schema, String schemaName, ++ public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName, + String tableName, List listOfLoadFolderDetails) throws IOException { CarbonTablePath carbonTablePath = CarbonStorePath .getCarbonTablePath(schema.getCarbonTable().getStorePath(), schema.getCarbonTable().getCarbonTableIdentifier()); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala ---------------------------------------------------------------------- diff --cc integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala index 1d46081,633b0fd..5d43620 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala @@@ -17,14 -17,12 +17,15 @@@ package org.apache.spark.sql +import java.io.File + import scala.language.implicitConversions -import org.apache.spark.SparkContext +import org.apache.spark.{Logging, SparkContext} + import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.sql.catalyst.{CatalystConf, ParserDialect} import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog} -import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer} +import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.PartitionData import org.apache.spark.sql.hive._ @@@ -123,13 -81,11 +124,15 @@@ class CarbonContext object CarbonContext { + val datasourceName: String = "org.apache.carbondata.format" + + val datasourceShortName: String = "carbondata" + + @transient + val LOGGER = LogServiceFactory.getLogService(CarbonContext.getClass.getName) /** - * @param schemaName - Schema Name - * @param cubeName - Cube Name + * @param databaseName - Database Name + * @param tableName - Table Name * @param factPath - Raw CSV data path * @param targetPath - Target path where the file will be split as per partition * @param delimiter - default file delimiter is comma(,) @@@ -197,4 -153,30 +200,28 @@@ cache(sc) = cc } - def datasourceName: String = "org.apache.carbondata.format" - + /** + * + * Requesting the extra executors other than the existing ones. + * @param sc + * @param numExecutors + * @return + */ + final def ensureExecutors(sc: SparkContext, numExecutors: Int): Boolean = { + sc.schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => + val requiredExecutors = numExecutors - b.numExistingExecutors + LOGGER + .info("number of executors is =" + numExecutors + " existing executors are =" + b + .numExistingExecutors + ) + if(requiredExecutors > 0) { + b.requestExecutors(requiredExecutors) + } + true + case _ => + false + } + + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala ---------------------------------------------------------------------- diff --cc integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala index 35d989a,3035a47..4e938fd --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala @@@ -245,8 -229,7 +236,8 @@@ case class CarbonRelation .map(x => AttributeReference(x.getColName, CarbonMetastoreTypes.toDataType( metaData.carbonTable.getMeasureByName(factTable, x.getColName).getDataType.toString .toLowerCase match { - case "int" => "double" - case "short" => "double" + case "int" => "long" ++ case "short" => "long" case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")" case others => others }), @@@ -266,6 -249,27 +257,27 @@@ } } + private var tableStatusLastUpdateTime = 0L + + private var sizeInBytesLocalValue = 0L + + def sizeInBytes: Long = { + val tableStatusNewLastUpdatedTime = new SegmentStatusManager( - cubeMeta.carbonTable.getAbsoluteTableIdentifier) ++ tableMeta.carbonTable.getAbsoluteTableIdentifier) + .getTableStatusLastModifiedTime + if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) { + val tablePath = CarbonStorePath.getCarbonTablePath( - cubeMeta.storePath, - cubeMeta.carbonTableIdentifier).getPath ++ tableMeta.storePath, ++ tableMeta.carbonTableIdentifier).getPath + val fileType = FileFactory.getFileType(tablePath) + if(FileFactory.isFileExist(tablePath, fileType)) { + tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime + sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath) + } + } + sizeInBytesLocalValue + } + } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --cc integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 25abbd5,3a0e21c..22887e7 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@@ -145,9 -147,12 +145,12 @@@ case class DataLoadTableFileMapping(tab case class CarbonMergerMapping(storeLocation: String, hdfsStoreLocation: String, partitioner: Partitioner, metadataFilePath: String, mergedLoadName: String, - kettleHomePath: String, cubeCreationTime: Long, schemaName: String, + kettleHomePath: String, tableCreationTime: Long, databaseName: String, factTableName: String, validSegments: Array[String], tableId: String) + case class NodeInfo(TaskId: String, noOfBlocks: Int) + + case class AlterTableModel(dbName: Option[String], tableName: String, compactionType: String) case class CompactionModel(compactionSize: Long, @@@ -1013,21 -1454,29 +1016,30 @@@ private[sql] case class LoadTable def run(sqlContext: SQLContext): Seq[Row] = { - val schemaName = getDB.getDatabaseName(schemaNameOp, sqlContext) + val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext) + val identifier = TableIdentifier(tableName, Option(dbName)) if (isOverwriteExist) { - sys.error("Overwrite is not supported for carbon table with " + schemaName + "." + tableName) + sys.error("Overwrite is not supported for carbon table with " + dbName + "." + tableName) } if (null == org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance - .getCarbonTable(schemaName + "_" + tableName)) { - logError("Data loading failed. table not found: " + schemaName + "." + tableName) - LOGGER.audit(s"Data loading failed. table not found: $schemaName.$tableName") - sys.error("Data loading failed. table not found: " + schemaName + "." + tableName) + .getCarbonTable(dbName + "_" + tableName)) { + logError("Data loading failed. table not found: " + dbName + "." + tableName) + LOGGER.audit("Data loading failed. table not found: " + dbName + "." + tableName) + sys.error("Data loading failed. table not found: " + dbName + "." + tableName) } + + val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog - .lookupRelation1(Option(schemaName), tableName, None)(sqlContext) ++ .lookupRelation1(Option(dbName), tableName)(sqlContext) + .asInstanceOf[CarbonRelation] + if (relation == null) { - sys.error(s"Table $schemaName.$tableName does not exist") ++ sys.error(s"Table $dbName.$tableName does not exist") + } CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false") - val carbonLock = CarbonLockFactory.getCarbonLockObj(org.carbondata.core. - carbon.metadata.CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName). - getMetaDataFilepath, LockUsage.METADATA_LOCK) + val carbonLock = CarbonLockFactory - .getCarbonLockObj(relation.cubeMeta.carbonTable.getAbsoluteTableIdentifier ++ .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier + .getCarbonTableIdentifier, + LockUsage.METADATA_LOCK + ) try { if (carbonLock.lockWithRetries()) { logInfo("Successfully able to get the table metadata file lock") @@@ -1037,15 -1486,10 +1049,10 @@@ } val factPath = FileUtils.getPaths(CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser)) - val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog - .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation] - if (relation == null) { - sys.error(s"Table $dbName.$tableName does not exist") - } val carbonLoadModel = new CarbonLoadModel() - carbonLoadModel.setTableName(relation.cubeMeta.carbonTableIdentifier.getTableName) - carbonLoadModel.setDatabaseName(relation.cubeMeta.carbonTableIdentifier.getDatabaseName) - carbonLoadModel.setStorePath(relation.cubeMeta.storePath) + carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName) + carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName) + carbonLoadModel.setStorePath(relation.tableMeta.storePath) if (dimFilesPath.isEmpty) { carbonLoadModel.setDimFolderPath(null) } @@@ -1144,13 -1596,14 +1159,14 @@@ fileHeader, escapeChar, multiLine)(sqlContext.asInstanceOf[HiveContext]) carbonLoadModel.setFactFilePath(FileUtils.getPaths(partitionLocation)) + carbonLoadModel.setColDictFilePath(columnDict) } GlobalDictionaryUtil - .generateGlobalDictionary(sqlContext, carbonLoadModel, relation.cubeMeta.storePath) + .generateGlobalDictionary(sqlContext, carbonLoadModel, relation.tableMeta.storePath) CarbonDataRDDFactory - .loadCarbonData(sqlContext, carbonLoadModel, storeLocation, relation.cubeMeta.storePath, + .loadCarbonData(sqlContext, carbonLoadModel, storeLocation, relation.tableMeta.storePath, kettleHomePath, - relation.cubeMeta.partitioner, columinar, isAgg = false, partitionStatus) + relation.tableMeta.partitioner, columinar, isAgg = false, partitionStatus) } catch { case ex: Exception => @@@ -1465,22 -2094,9 +1486,9 @@@ private[sql] case class DescribeCommand } } - private[sql] case class DescribeNativeCommand(sql: String, - override val output: Seq[Attribute]) - extends RunnableCommand { - override def run(sqlContext: SQLContext): Seq[Row] = { - val output = sqlContext.asInstanceOf[HiveContext].catalog.client.runSqlHive(sql) - output.map(x => { - val row = x.split("\t", -3) - Row(row(0), row(1), row(2)) - } - ).tail - } - } - private[sql] case class DeleteLoadByDate( - schemaNameOp: Option[String], - cubeName: String, + databaseNameOp: Option[String], + tableName: String, dateField: String, dateValue: String ) extends RunnableCommand { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala ---------------------------------------------------------------------- diff --cc integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala index 95c78bb,4209d82..8cbe26b --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala @@@ -257,19 -339,9 +257,8 @@@ class CarbonStrategies(sqlContext: SQLC val resolvedTable = sqlContext.executePlan(describe.table).analyzed val resultPlan = sqlContext.executePlan(resolvedTable).executedPlan ExecutedCommand(DescribeCommandFormatted(resultPlan, plan.output, tblIdentifier)) :: Nil - } - else { + } else { - ExecutedCommand(DescribeNativeCommand(sql, plan.output)) :: Nil - } - case describe@LogicalDescribeCommand(table, isExtended) => - val resolvedTable = sqlContext.executePlan(describe.table).analyzed - resolvedTable match { - case t: MetastoreRelation => - ExecutedCommand( - DescribeHiveTableCommand(t, describe.output, describe.isExtended)) :: Nil - case o: LogicalPlan => - val resultPlan = sqlContext.executePlan(o).executedPlan - ExecutedCommand( - RunnableDescribeCommand(resultPlan, describe.output, describe.isExtended)) :: Nil + ExecutedCommand(HiveNativeCommand(sql)) :: Nil } case _ => Nil http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --cc integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 473aa43,7271ee0..2024f5b --- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@@ -30,8 -30,9 +30,9 @@@ import org.apache.hadoop.conf.{Configur import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileSplit import org.apache.spark.{Logging, Partition, SparkContext, SparkEnv} -import org.apache.spark.sql.{CarbonEnv, CarbonRelation, SQLContext} +import org.apache.spark.sql.{CarbonEnv, SQLContext} import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, Partitioner} + import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.util.{FileUtils, SplitUtils} import org.carbondata.common.logging.LogServiceFactory @@@ -184,7 -186,9 +185,9 @@@ object CarbonDataRDDFactory extends Log // Save the load metadata val carbonLock = CarbonLockFactory - .getCarbonLockObj(table.getMetaDataFilepath, LockUsage.METADATA_LOCK) - .getCarbonLockObj(cube.getAbsoluteTableIdentifier.getCarbonTableIdentifier, ++ .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + LockUsage.METADATA_LOCK + ) try { if (carbonLock.lockWithRetries()) { logInfo("Successfully got the table metadata file lock") @@@ -467,10 -525,12 +472,12 @@@ val compactionModel = CompactionModel(compactionSize, CompactionType.MINOR_COMPACTION, carbonTable, - cubeCreationTime + tableCreationTime ) val lock = CarbonLockFactory - .getCarbonLockObj(carbonTable.getMetaDataFilepath, LockUsage.COMPACTION_LOCK) + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + LockUsage.COMPACTION_LOCK + ) var storeLocation = "" var configuredStore = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf) @@@ -863,7 -942,9 +878,9 @@@ currentRestructNumber = 0 } val carbonLock = CarbonLockFactory - .getCarbonLockObj(table.getMetaDataFilepath, LockUsage.METADATA_LOCK) - .getCarbonLockObj(cube.getAbsoluteTableIdentifier.getCarbonTableIdentifier, ++ .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + LockUsage.METADATA_LOCK + ) try { if (carbonLock.lockWithRetries()) { deleteLoadsAndUpdateMetadata(carbonLoadModel, http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala ---------------------------------------------------------------------- diff --cc integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala index d3f7913,9abcceb..ba0eae7 --- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala +++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala @@@ -30,10 -32,21 +32,20 @@@ import org.apache.spark.sql.Ro import org.carbondata.common.logging.LogServiceFactory import org.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier} + import org.carbondata.core.carbon.metadata.datatype.DataType + import org.carbondata.core.carbon.metadata.encoder.Encoding import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension import org.carbondata.core.constants.CarbonCommonConstants - import org.carbondata.spark.load.CarbonLoaderUtil + import org.carbondata.core.datastorage.store.impl.FileFactory -import org.carbondata.core.locks.CarbonLockFactory -import org.carbondata.core.locks.LockUsage + import org.carbondata.core.util.CarbonTimeStatisticsFactory ++import org.carbondata.lcm.locks.CarbonLockFactory + import org.carbondata.processing.etl.DataLoadingException -import org.carbondata.spark.load.{CarbonLoaderUtil, CarbonLoadModel} ++import org.carbondata.spark.load.{CarbonLoadModel, CarbonLoaderUtil} + import org.carbondata.spark.partition.reader.{CSVParser, CSVReader} + import org.carbondata.spark.tasks.DictionaryWriterTask + import org.carbondata.spark.tasks.SortIndexWriterTask import org.carbondata.spark.util.GlobalDictionaryUtil + import org.carbondata.spark.util.GlobalDictionaryUtil._ /** * A partitioner partition by column. http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --cc integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala index f26cedf,e2dc900..e22869c --- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala @@@ -36,10 -39,10 +39,9 @@@ import org.carbondata.core.util.CarbonP import org.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit} import org.carbondata.integration.spark.merger.{CarbonCompactionExecutor, CarbonCompactionUtil, RowResultMerger} import org.carbondata.processing.util.CarbonDataProcessorUtil -import org.carbondata.query.carbon.result.RowResult -import org.carbondata.query.carbon.result.iterator.RawResultIterator +import org.carbondata.scan.result.iterator.RawResultIterator import org.carbondata.spark.MergeResult import org.carbondata.spark.load.{CarbonLoaderUtil, CarbonLoadModel} - import org.carbondata.spark.merger.CarbonDataMergerUtil import org.carbondata.spark.splits.TableSplit import org.carbondata.spark.util.QueryPlanUtil @@@ -169,18 -173,30 +172,30 @@@ class CarbonMergerRDD[K, V] } override def getPartitions: Array[Partition] = { + val startTime = System.currentTimeMillis() val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier( - hdfsStoreLocation, new CarbonTableIdentifier(schemaName, factTableName, tableId) + hdfsStoreLocation, new CarbonTableIdentifier(databaseName, factTableName, tableId) ) - val (carbonInputFormat: CarbonInputFormat[RowResult], job: Job) = + val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) = QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier) val result = new util.ArrayList[Partition](defaultParallelism) - val mapsOfNodeBlockMapping: util.List[util.Map[String, util.List[TableBlockInfo]]] = new - java.util.ArrayList[util.Map[String, util.List[TableBlockInfo]]]() + + // mapping of the node and block list. + var nodeMapping: util.Map[String, util.List[Distributable]] = new + util.HashMap[String, util.List[Distributable]] + var noOfBlocks = 0 + + var taskInfoList = new util.ArrayList[Distributable] + + // for each valid segment. for (eachSeg <- carbonMergerMapping.validSegments) { + // map for keeping the relation of a task and its blocks. + val taskIdMapping: util.Map[String, util.List[TableBlockInfo]] = new + util.HashMap[String, util.List[TableBlockInfo]] + job.getConfiguration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg) // get splits http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala ---------------------------------------------------------------------- diff --cc integration/spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala index 056e336,2680d7d..b841fed --- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala +++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala @@@ -83,12 -83,26 +83,26 @@@ object Compactor carbonLoadModel.setLoadMetadataDetails(segmentStatusManager .readLoadMetadata(carbonTable.getMetaDataFilepath()).toList.asJava ) + var execInstance = "1" + // in case of non dynamic executor allocation, number of executors are fixed. + if (sc.sparkContext.getConf.contains("spark.executor.instances")) { + execInstance = sc.sparkContext.getConf.get("spark.executor.instances") + logger.info("spark.executor.instances property is set to =" + execInstance) + } // in case of dynamic executor allocation, taking the max executors of the dynamic allocation. + else if (sc.sparkContext.getConf.contains("spark.dynamicAllocation.enabled")) { + if (sc.sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim + .equalsIgnoreCase("true")) { + execInstance = sc.sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors") + logger.info("spark.dynamicAllocation.maxExecutors property is set to =" + execInstance) + } + } val mergeStatus = new CarbonMergerRDD( - sc.sparkContext, + sqlContext.sparkContext, new MergeResultImpl(), carbonLoadModel, - carbonMergerMapping + carbonMergerMapping, + execInstance ).collect if(mergeStatus.length == 0) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/test/scala/org/carbondata/spark/testsuite/aggquery/AllDataTypesTestCaseAggregate.scala ---------------------------------------------------------------------- diff --cc integration/spark/src/test/scala/org/carbondata/spark/testsuite/aggquery/AllDataTypesTestCaseAggregate.scala index 79ba649,196ca45..2569968 --- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/aggquery/AllDataTypesTestCaseAggregate.scala +++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/aggquery/AllDataTypesTestCaseAggregate.scala @@@ -42,8 -42,18 +42,18 @@@ class AllDataTypesTestCaseAggregate ext "String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance " + "int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'") sql( - "LOAD DATA LOCAL INPATH './src/test/resources/data.csv' INTO TABLE alldatatypescubeAGG " + + "LOAD DATA LOCAL INPATH './src/test/resources/data.csv' INTO TABLE alldatatypestableAGG " + "OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')") + + sql( + "CREATE TABLE alldatatypescubeAGG_hive (empno int, empname String, designation String, doj " + + "Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname " + + "String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance " + + "int,utilization int,salary int)row format delimited fields terminated by ','") + sql( + "LOAD DATA LOCAL INPATH './src/test/resources/datawithoutheader.csv' INTO TABLE alldatatypescubeAGG_hive") + + } test( @@@ -52,9 -62,11 +62,11 @@@ { checkAnswer( sql( - "select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubeAGG where" + + "select empno,empname,utilization,count(salary),sum(empno) from alldatatypestableAGG where" + " empname in ('arvind','ayushi') group by empno,empname,utilization"), - Seq(Row(11, "arvind", 96.2, 1, 11), Row(15, "ayushi", 91.5, 1, 15))) + sql( + "select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubeAGG_hive where" + + " empname in ('arvind','ayushi') group by empno,empname,utilization")) } test( @@@ -63,9 -75,11 +75,11 @@@ { checkAnswer( sql( - "select empname,trim(designation),avg(salary),avg(empno) from alldatatypescubeAGG where " + + "select empname,trim(designation),avg(salary),avg(empno) from alldatatypestableAGG where " + "empname in ('arvind','ayushi') group by empname,trim(designation)"), - Seq(Row("arvind", "SE", 5040.56, 11.0), Row("ayushi", "SSA", 13245.48, 15.0))) + sql( + "select empname,trim(designation),avg(salary),avg(empno) from alldatatypescubeAGG_hive where " + + "empname in ('arvind','ayushi') group by empname,trim(designation)")) } test( @@@ -76,9 -90,12 +90,12 @@@ checkAnswer( sql( "select empname,length(designation),max(empno),min(empno), avg(empno) from " + - "alldatatypescubeAGG where empname in ('arvind','ayushi') group by empname,length" + + "alldatatypestableAGG where empname in ('arvind','ayushi') group by empname,length" + "(designation) order by empname"), - Seq(Row("arvind", 2, 11, 11, 11.0), Row("ayushi", 3, 15, 15, 15.0))) + sql( + "select empname,length(designation),max(empno),min(empno), avg(empno) from " + + "alldatatypescubeAGG_hive where empname in ('arvind','ayushi') group by empname,length" + + "(designation) order by empname")) } override def afterAll { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala ---------------------------------------------------------------------- diff --cc integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala index d13a9df,c850bd1..a64e6ac --- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala +++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala @@@ -44,9 -44,11 +44,13 @@@ class AllDataTypesTestCaseAggregate ext CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) sql("LOAD DATA LOCAL INPATH '"+currentDirectory+"/src/test/resources/100_olap.csv' INTO table Carbon_automation_test options('DELIMITER'= ',' ,'QUOTECHAR'= '\"', 'FILEHEADER'= 'imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Lat est_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')"); + sql("create table if not exists Carbon_automation_hive (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Activ e_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string, gamePointId int,contractNumber int) row format delimited fields terminated by ','"); + sql("LOAD DATA LOCAL INPATH '"+currentDirectory+"/src/test/resources/100_olap.csv' INTO table Carbon_automation_hive "); + //hive table + sql("create table Carbon_automation_test_hive (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string,contractNumber int, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion str ing, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointId int,gamePointDescription string)row format delimited fields terminated by ','"); + sql("LOAD DATA LOCAL INPATH '"+currentDirectory+"/src/test/resources/100_olap.csv' INTO table Carbon_automation_test_hive"); + } override def afterAll { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/test/scala/org/carbondata/spark/testsuite/deleteTable/TestDeleteTableNewDDL.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/test/scala/org/carbondata/spark/testsuite/detailquery/AllDataTypesTestCase.scala ---------------------------------------------------------------------- diff --cc integration/spark/src/test/scala/org/carbondata/spark/testsuite/detailquery/AllDataTypesTestCase.scala index 9fe9ba6,007ed9d..16ab932 --- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/detailquery/AllDataTypesTestCase.scala +++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/detailquery/AllDataTypesTestCase.scala @@@ -33,17 -33,22 +33,22 @@@ import org.scalatest.BeforeAndAfterAl class AllDataTypesTestCase extends QueryTest with BeforeAndAfterAll { override def beforeAll { - sql("CREATE TABLE alldatatypescube (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'") - sql("LOAD DATA LOCAL INPATH './src/test/resources/data.csv' INTO TABLE alldatatypescube OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')"); - - sql("CREATE TABLE alldatatypescube_hive (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int)row format delimited fields terminated by ','") - sql("LOAD DATA local inpath './src/test/resources/datawithoutheader.csv' INTO TABLE alldatatypescube_hive"); + sql("CREATE TABLE alldatatypestable (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'") + sql("LOAD DATA LOCAL INPATH './src/test/resources/data.csv' INTO TABLE alldatatypestable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')"); ++ ++ sql("CREATE TABLE alldatatypestable_hive (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int)row format delimited fields terminated by ','") ++ sql("LOAD DATA local inpath './src/test/resources/datawithoutheader.csv' INTO TABLE alldatatypestable_hive"); + } - test("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescube where empname in ('arvind','ayushi') group by empno,empname,utilization") { + test("select empno,empname,utilization,count(salary),sum(empno) from alldatatypestable where empname in ('arvind','ayushi') group by empno,empname,utilization") { checkAnswer( - sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescube where empname in ('arvind','ayushi') group by empno,empname,utilization"), - sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescube_hive where empname in ('arvind','ayushi') group by empno,empname,utilization")) + sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypestable where empname in ('arvind','ayushi') group by empno,empname,utilization"), - Seq(Row(11, "arvind", 96.2, 1, 11), Row(15, "ayushi", 91.5, 1, 15))) ++ sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypestable_hive where empname in ('arvind','ayushi') group by empno,empname,utilization")) } override def afterAll { - sql("drop table alldatatypescube") - sql("drop table alldatatypescube_hive") + sql("drop table alldatatypestable") ++ sql("drop table alldatatypestable_hive") } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/test/scala/org/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala ---------------------------------------------------------------------- diff --cc integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala index 670981f,d9abe75..161a19e --- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala +++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala @@@ -33,17 -33,22 +33,22 @@@ import org.scalatest.BeforeAndAfterAl class AllDataTypesTestCaseFilter extends QueryTest with BeforeAndAfterAll { override def beforeAll { - sql("CREATE TABLE alldatatypescubeFilter (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'") - sql("LOAD DATA local inpath './src/test/resources/data.csv' INTO TABLE alldatatypescubeFilter OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')"); + sql("CREATE TABLE alldatatypestableFilter (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'") + sql("LOAD DATA local inpath './src/test/resources/data.csv' INTO TABLE alldatatypestableFilter OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')"); + - sql("CREATE TABLE alldatatypescubeFilter_hive (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int)row format delimited fields terminated by ','") - sql("LOAD DATA local inpath './src/test/resources/datawithoutheader.csv' INTO TABLE alldatatypescubeFilter_hive"); ++ sql("CREATE TABLE alldatatypestableFilter_hive (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int)row format delimited fields terminated by ','") ++ sql("LOAD DATA local inpath './src/test/resources/datawithoutheader.csv' INTO TABLE alldatatypestableFilter_hive"); + } - test("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubeFilter where empname in ('arvind','ayushi') group by empno,empname,utilization") { + test("select empno,empname,utilization,count(salary),sum(empno) from alldatatypestableFilter where empname in ('arvind','ayushi') group by empno,empname,utilization") { checkAnswer( - sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubeFilter where empname in ('arvind','ayushi') group by empno,empname,utilization"), - sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubeFilter_hive where empname in ('arvind','ayushi') group by empno,empname,utilization")) + sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypestableFilter where empname in ('arvind','ayushi') group by empno,empname,utilization"), - Seq(Row(11, "arvind", 96.2, 1, 11), Row(15, "ayushi", 91.5, 1, 15))) ++ sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypestableFilter_hive where empname in ('arvind','ayushi') group by empno,empname,utilization")) } override def afterAll { - sql("drop table alldatatypescubeFilter") - sql("drop table alldatatypescubeFilter_hive") + sql("drop table alldatatypestableFilter") ++ sql("drop table alldatatypestableFilter_hive") } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/AllDataTypesTestCaseJoin.scala ---------------------------------------------------------------------- diff --cc integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/AllDataTypesTestCaseJoin.scala index fd5b57d,e0f6ae5..919dff3 --- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/AllDataTypesTestCaseJoin.scala +++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/AllDataTypesTestCaseJoin.scala @@@ -33,17 -33,22 +33,22 @@@ import org.scalatest.BeforeAndAfterAl class AllDataTypesTestCaseJoin extends QueryTest with BeforeAndAfterAll { override def beforeAll { - sql("CREATE TABLE alldatatypescubeJoin (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'") - sql("LOAD DATA local inpath './src/test/resources/data.csv' INTO TABLE alldatatypescubeJoin OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')"); + sql("CREATE TABLE alldatatypestableJoin (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'") + sql("LOAD DATA local inpath './src/test/resources/data.csv' INTO TABLE alldatatypestableJoin OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')"); + - sql("CREATE TABLE alldatatypescubeJoin_hive (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int)row format delimited fields terminated by ','") - sql("LOAD DATA local inpath './src/test/resources/datawithoutheader.csv' INTO TABLE alldatatypescubeJoin_hive"); ++ sql("CREATE TABLE alldatatypestableJoin_hive (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int)row format delimited fields terminated by ','") ++ sql("LOAD DATA local inpath './src/test/resources/datawithoutheader.csv' INTO TABLE alldatatypestableJoin_hive"); + } - test("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubeJoin where empname in ('arvind','ayushi') group by empno,empname,utilization") { + test("select empno,empname,utilization,count(salary),sum(empno) from alldatatypestableJoin where empname in ('arvind','ayushi') group by empno,empname,utilization") { checkAnswer( - sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubeJoin where empname in ('arvind','ayushi') group by empno,empname,utilization"), - sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubeJoin_hive where empname in ('arvind','ayushi') group by empno,empname,utilization")) + sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypestableJoin where empname in ('arvind','ayushi') group by empno,empname,utilization"), - Seq(Row(11, "arvind", 96.2, 1, 11), Row(15, "ayushi", 91.5, 1, 15))) ++ sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypestableJoin_hive where empname in ('arvind','ayushi') group by empno,empname,utilization")) } override def afterAll { - sql("drop table alldatatypescubeJoin") - sql("drop table alldatatypescubeJoin_hive") + sql("drop table alldatatypestableJoin") ++ sql("drop table alldatatypestableJoin_hive") } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/test/scala/org/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala ---------------------------------------------------------------------- diff --cc integration/spark/src/test/scala/org/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala index 4aab691,3b9c98d..708fdb9 --- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala +++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala @@@ -33,17 -33,22 +33,22 @@@ import org.scalatest.BeforeAndAfterAl class AllDataTypesTestCaseSort extends QueryTest with BeforeAndAfterAll { override def beforeAll { - sql("CREATE TABLE alldatatypescubesort (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'") - sql("LOAD DATA local inpath './src/test/resources/data.csv' INTO TABLE alldatatypescubesort OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')"); + sql("CREATE TABLE alldatatypestablesort (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'") + sql("LOAD DATA local inpath './src/test/resources/data.csv' INTO TABLE alldatatypestablesort OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')"); + - sql("CREATE TABLE alldatatypescubesort_hive (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int)row format delimited fields terminated by ','") - sql("LOAD DATA local inpath './src/test/resources/datawithoutheader.csv' INTO TABLE alldatatypescubesort_hive"); ++ sql("CREATE TABLE alldatatypestablesort_hive (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int)row format delimited fields terminated by ','") ++ sql("LOAD DATA local inpath './src/test/resources/datawithoutheader.csv' INTO TABLE alldatatypestablesort_hive"); + } - test("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubesort where empname in ('arvind','ayushi') group by empno,empname,utilization order by empno") { + test("select empno,empname,utilization,count(salary),sum(empno) from alldatatypestablesort where empname in ('arvind','ayushi') group by empno,empname,utilization order by empno") { checkAnswer( - sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubesort where empname in ('arvind','ayushi') group by empno,empname,utilization order by empno"), - sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypescubesort_hive where empname in ('arvind','ayushi') group by empno,empname,utilization order by empno")) + sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypestablesort where empname in ('arvind','ayushi') group by empno,empname,utilization order by empno"), - Seq(Row(11, "arvind", 96.2, 1, 11), Row(15, "ayushi", 91.5, 1, 15))) ++ sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypestablesort_hive where empname in ('arvind','ayushi') group by empno,empname,utilization order by empno")) } override def afterAll { - sql("drop table alldatatypescubesort") - sql("drop table alldatatypescubesort_hive") + sql("drop table alldatatypestablesort") ++ sql("drop table alldatatypestablesort_hive") } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/test/scala/org/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala ---------------------------------------------------------------------- diff --cc integration/spark/src/test/scala/org/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala index 0edc9e9,7596188..69d8b33 --- a/integration/spark/src/test/scala/org/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala +++ b/integration/spark/src/test/scala/org/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala @@@ -106,9 -107,19 +107,19 @@@ class AutoHighCardinalityIdentifyTestCa } } - def relation: CarbonRelation = { + def buildColGrpHighCardTable() { + try { + sql("drop table if exists colgrp_highcard") + sql("""create table if not exists colgrp_highcard + (hc1 string, c2 string, c3 int) + STORED BY 'org.apache.carbondata.format' tblproperties('COLUMN_GROUPS'='(hc1,c2)')""") + } catch { + case ex: Throwable => logError(ex.getMessage + "\r\n" + ex.getStackTraceString) - } ++ } + } + def relation(tableName: String): CarbonRelation = { CarbonEnv.getInstance(CarbonHiveContext).carbonCatalog - .lookupRelation1(Option("default"), "highcard")(CarbonHiveContext) - .lookupRelation1(Option("default"), tableName, None)(CarbonHiveContext) ++ .lookupRelation1(Option("default"), tableName)(CarbonHiveContext) .asInstanceOf[CarbonRelation] } @@@ -136,10 -147,10 +147,10 @@@ assert(newC2.hasEncoding(Encoding.DICTIONARY)) } - test("auto identify high cardinality column in first load #396") { - val oldTable = relation("highcard").cubeMeta.carbonTable + test("auto identify high cardinality column in first load #396") { - val oldTable = relation.tableMeta.carbonTable ++ val oldTable = relation("highcard").tableMeta.carbonTable sql(s"LOAD DATA LOCAL INPATH '$filePath' into table highcard") - val newTable = relation.tableMeta.carbonTable - val newTable = relation("highcard").cubeMeta.carbonTable ++ val newTable = relation("highcard").tableMeta.carbonTable sql(s"select count(hc1) from highcard").show // check dictionary file @@@ -147,4 -158,29 +158,29 @@@ // check the meta data checkMetaData(oldTable, newTable) } - ++ + test("skip auto identify high cardinality column for column group") { - val oldTable = relation("colgrp_highcard").cubeMeta.carbonTable ++ val oldTable = relation("colgrp_highcard").tableMeta.carbonTable + sql(s"LOAD DATA LOCAL INPATH '$filePath' into table colgrp_highcard") - val newTable = relation("colgrp_highcard").cubeMeta.carbonTable ++ val newTable = relation("colgrp_highcard").tableMeta.carbonTable + sql(s"select hc1 from colgrp_highcard").show + + // check dictionary file + val tableIdentifier = new CarbonTableIdentifier(newTable.getDatabaseName, + newTable.getFactTableName, "1") + val carbonTablePath = CarbonStorePath.getCarbonTablePath(CarbonHiveContext.hdfsCarbonBasePath, + tableIdentifier) + val newHc1 = newTable.getDimensionByName("colgrp_highcard", "hc1") + val newC2 = newTable.getDimensionByName("colgrp_highcard", "c2") + val dictFileHc1 = carbonTablePath.getDictionaryFilePath(newHc1.getColumnId) + val dictFileC2 = carbonTablePath.getDictionaryFilePath(newC2.getColumnId) + assert(CarbonUtil.isFileExists(dictFileHc1)) + assert(CarbonUtil.isFileExists(dictFileC2)) + // check the meta data + val hc1 = newTable.getDimensionByName("colgrp_highcard", "hc1") + val c2 = newTable.getDimensionByName("colgrp_highcard", "c2") + assert(hc1.hasEncoding(Encoding.DICTIONARY)) + assert(c2.hasEncoding(Encoding.DICTIONARY)) - ++ + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/integration/spark/src/test/scala/org/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala ---------------------------------------------------------------------- diff --cc integration/spark/src/test/scala/org/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala index 5d26965,f1a076b..c38dc50 --- a/integration/spark/src/test/scala/org/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala +++ b/integration/spark/src/test/scala/org/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala @@@ -209,9 -191,10 +191,10 @@@ class GlobalDictionaryUtilTestCase exte GlobalDictionaryUtil .generateGlobalDictionary(CarbonHiveContext, carbonLoadModel, - sampleRelation.cubeMeta.storePath + sampleRelation.tableMeta.storePath ) - checkDictionary(incrementalLoadTableRelation, "deviceInformationId", "100010") + DictionaryTestCaseUtil. + checkDictionary(incrementalLoadTableRelation, "deviceInformationId", "100010") // load 2 carbonLoadModel = buildCarbonLoadModel(incrementalLoadTableRelation, @@@ -222,9 -205,10 +205,10 @@@ GlobalDictionaryUtil .generateGlobalDictionary(CarbonHiveContext, carbonLoadModel, - sampleRelation.cubeMeta.storePath + sampleRelation.tableMeta.storePath ) - checkDictionary(incrementalLoadTableRelation, "deviceInformationId", "100077") + DictionaryTestCaseUtil. + checkDictionary(incrementalLoadTableRelation, "deviceInformationId", "100077") } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/processing/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/processing/src/main/java/org/carbondata/lcm/locks/CarbonLockFactory.java ---------------------------------------------------------------------- diff --cc processing/src/main/java/org/carbondata/lcm/locks/CarbonLockFactory.java index 8fc1cd5,0000000..10aa1fd mode 100644,000000..100644 --- a/processing/src/main/java/org/carbondata/lcm/locks/CarbonLockFactory.java +++ b/processing/src/main/java/org/carbondata/lcm/locks/CarbonLockFactory.java @@@ -1,72 -1,0 +1,73 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.carbondata.lcm.locks; + ++import org.carbondata.core.carbon.CarbonTableIdentifier; +import org.carbondata.core.constants.CarbonCommonConstants; +import org.carbondata.core.util.CarbonProperties; + +/** + * This class is a Lock factory class which is used to provide lock objects. + * Using this lock object client can request the lock and unlock. + */ +public class CarbonLockFactory { + + /** + * lockTypeConfigured to check if zookeeper feature is enabled or not for carbon. + */ + private static String lockTypeConfigured; + + static { + CarbonLockFactory.updateZooKeeperLockingStatus(); + } + + /** + * This method will determine the lock type. + * - * @param location - * @param lockUsage ++ * @param tableIdentifier ++ * @param lockFile + * @return + */ - public static ICarbonLock getCarbonLockObj(String location, LockUsage lockUsage) { ++ public static ICarbonLock getCarbonLockObj(CarbonTableIdentifier tableIdentifier, ++ String lockFile) { + switch (lockTypeConfigured.toUpperCase()) { + case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL: - return new LocalFileLock(location, lockUsage); ++ return new LocalFileLock(tableIdentifier, lockFile); + + case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER: - return new ZooKeeperLocking(location, lockUsage); ++ return new ZooKeeperLocking(tableIdentifier, lockFile); + + case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS: - return new HdfsFileLock(location, lockUsage); ++ return new HdfsFileLock(tableIdentifier, lockFile); + + default: + throw new UnsupportedOperationException("Not supported the lock type"); + } - + } + + /** + * This method will set the zookeeper status whether zookeeper to be used for locking or not. + */ + private static void updateZooKeeperLockingStatus() { + lockTypeConfigured = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.LOCK_TYPE_DEFAULT); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/processing/src/main/java/org/carbondata/lcm/locks/HdfsFileLock.java ---------------------------------------------------------------------- diff --cc processing/src/main/java/org/carbondata/lcm/locks/HdfsFileLock.java index f0c3ad5,0000000..b45ae96 mode 100644,000000..100644 --- a/processing/src/main/java/org/carbondata/lcm/locks/HdfsFileLock.java +++ b/processing/src/main/java/org/carbondata/lcm/locks/HdfsFileLock.java @@@ -1,89 -1,0 +1,91 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.carbondata.lcm.locks; + +import java.io.DataOutputStream; - import java.io.File; +import java.io.IOException; + ++import org.carbondata.core.carbon.CarbonTableIdentifier; ++import org.carbondata.core.constants.CarbonCommonConstants; +import org.carbondata.core.datastorage.store.impl.FileFactory; + +/** + * This class is used to handle the HDFS File locking. + * This is acheived using the concept of acquiring the data out stream using Append option. + */ +public class HdfsFileLock extends AbstractCarbonLock { + + /** + * location hdfs file location + */ + private String location; + - /** - * lockUsage is used to determine the type of the lock. according to this the lock - * folder will change. - */ - private LockUsage lockUsage; - + private DataOutputStream dataOutputStream; + ++ public static String tmpPath; ++ ++ static { ++ tmpPath = System.getProperty("hadoop.tmp.dir"); ++ } ++ + /** - * @param location - * @param lockUsage ++ * @param tableIdentifier ++ * @param lockFile + */ - public HdfsFileLock(String location, LockUsage lockUsage) { - this.location = location; - this.lockUsage = lockUsage; - this.location = location + File.separator + this.lockUsage; ++ public HdfsFileLock(CarbonTableIdentifier tableIdentifier, String lockFile) { ++ this.location = ++ tmpPath + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getDatabaseName() ++ + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getTableName() ++ + CarbonCommonConstants.FILE_SEPARATOR + lockFile; + initRetry(); + } + + /* (non-Javadoc) + * @see org.carbondata.core.locks.ICarbonLock#lock() + */ + @Override public boolean lock() { + try { + if (!FileFactory.isFileExist(location, FileFactory.getFileType(location))) { + FileFactory.createNewLockFile(location, FileFactory.getFileType(location)); + } + dataOutputStream = + FileFactory.getDataOutputStreamUsingAppend(location, FileFactory.getFileType(location)); + + return true; + + } catch (IOException e) { + return false; + } + } + + /* (non-Javadoc) + * @see org.carbondata.core.locks.ICarbonLock#unlock() + */ + @Override public boolean unlock() { + if (null != dataOutputStream) { + try { + dataOutputStream.close(); + } catch (IOException e) { + return false; + } + } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/processing/src/main/java/org/carbondata/lcm/locks/LocalFileLock.java ---------------------------------------------------------------------- diff --cc processing/src/main/java/org/carbondata/lcm/locks/LocalFileLock.java index 1672117,0000000..8eeb57d mode 100644,000000..100644 --- a/processing/src/main/java/org/carbondata/lcm/locks/LocalFileLock.java +++ b/processing/src/main/java/org/carbondata/lcm/locks/LocalFileLock.java @@@ -1,162 -1,0 +1,152 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.carbondata.lcm.locks; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.OverlappingFileLockException; + +import org.carbondata.common.logging.LogService; +import org.carbondata.common.logging.LogServiceFactory; ++import org.carbondata.core.carbon.CarbonTableIdentifier; ++import org.carbondata.core.constants.CarbonCommonConstants; +import org.carbondata.core.datastorage.store.impl.FileFactory; + +/** + * This class handles the file locking in the local file system. + * This will be handled using the file channel lock API. + */ +public class LocalFileLock extends AbstractCarbonLock { + /** + * location is the location of the lock file. + */ + private String location; + + /** - * lockUsage will determine the lock folder. so that similar locks will try to acquire - * same lock file. - */ - private LockUsage lockUsage; - - /** + * fileOutputStream of the local lock file + */ + private FileOutputStream fileOutputStream; + + /** + * channel is the FileChannel of the lock file. + */ + private FileChannel channel; + + /** + * fileLock NIO FileLock Object + */ + private FileLock fileLock; + ++ /** ++ * lock file ++ */ ++ private String lockFile; ++ + public static final String tmpPath; + + private String tableName; + + private String databaseName; + + /** + * LOGGER for logging the messages. + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(LocalFileLock.class.getName()); + + static { + tmpPath = System.getProperty("java.io.tmpdir"); + } + + /** - * @param location - * @param lockUsage ++ * @param tableIdentifier ++ * @param lockFile + */ - public LocalFileLock(String location, LockUsage lockUsage) { - this.lockUsage = lockUsage; - location = location.replace("\\", "/"); - String tempStr = location.substring(0, location.lastIndexOf('/')); - tableName = tempStr.substring(tempStr.lastIndexOf('/') + 1, tempStr.length()); - tempStr = tempStr.substring(0, tempStr.lastIndexOf('/')); - databaseName = tempStr.substring(tempStr.lastIndexOf('/') + 1, tempStr.length()); ++ public LocalFileLock(CarbonTableIdentifier tableIdentifier, String lockFile) { + this.location = - tmpPath + File.separator + databaseName + File.separator + tableName + File.separator - + this.lockUsage; ++ tmpPath + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getDatabaseName() ++ + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getTableName(); ++ this.lockFile = lockFile; + initRetry(); + } + + /** + * Lock API for locking of the file channel of the lock file. + * + * @return + */ + @Override public boolean lock() { + try { - String databaseFolderPath = tmpPath + File.separator + databaseName; - String tableFolderPath = databaseFolderPath + File.separator + tableName; - // create dir with database name in tmp location. - if (!FileFactory.isFileExist(databaseFolderPath, FileFactory.getFileType(tmpPath))) { - FileFactory.mkdirs(databaseFolderPath, FileFactory.getFileType(tmpPath)); - } - - // create dir with table name in tmp location. - if (!FileFactory.isFileExist(tableFolderPath, FileFactory.getFileType(tmpPath))) { - FileFactory.mkdirs(tableFolderPath, FileFactory.getFileType(tmpPath)); ++ if (!FileFactory.isFileExist(location, FileFactory.getFileType(tmpPath))) { ++ FileFactory.mkdirs(location, FileFactory.getFileType(tmpPath)); + } - if (!FileFactory.isFileExist(location, FileFactory.getFileType(location))) { - FileFactory.createNewLockFile(location, FileFactory.getFileType(location)); ++ String lockFilePath = location + CarbonCommonConstants.FILE_SEPARATOR + ++ lockFile; ++ if (!FileFactory.isFileExist(lockFilePath, FileFactory.getFileType(location))) { ++ FileFactory.createNewLockFile(lockFilePath, FileFactory.getFileType(location)); + } + - fileOutputStream = new FileOutputStream(location); ++ fileOutputStream = new FileOutputStream(lockFilePath); + channel = fileOutputStream.getChannel(); + try { + fileLock = channel.tryLock(); + } catch (OverlappingFileLockException e) { + return false; + } + if (null != fileLock) { + return true; + } else { + return false; + } + } catch (IOException e) { + return false; + } + + } + + /** + * Unlock API for unlocking of the acquired lock. + * + * @return + */ + @Override public boolean unlock() { + boolean status; + try { + if (null != fileLock) { + fileLock.release(); + } + status = true; + } catch (IOException e) { + status = false; + } finally { + if (null != fileOutputStream) { + try { + fileOutputStream.close(); + } catch (IOException e) { + LOGGER.error(e.getMessage()); + } + } + } + return status; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eaecb651/processing/src/main/java/org/carbondata/lcm/locks/LockUsage.java ---------------------------------------------------------------------- diff --cc processing/src/main/java/org/carbondata/lcm/locks/LockUsage.java index 3b03b1f,0000000..4403f54 mode 100644,000000..100644 --- a/processing/src/main/java/org/carbondata/lcm/locks/LockUsage.java +++ b/processing/src/main/java/org/carbondata/lcm/locks/LockUsage.java @@@ -1,30 -1,0 +1,31 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.carbondata.lcm.locks; + +/** + * This enum is used to define the usecase of the lock. + * Each enum value is one specific lock case. + */ - public enum LockUsage { - METADATA_LOCK, - COMPACTION_LOCK, - TABLE_STATUS_LOCK; ++public class LockUsage { ++ public static String LOCK = ".lock"; ++ public static String METADATA_LOCK="meta.lock"; ++ public static String COMPACTION_LOCK="compaction.lock"; ++ public static String TABLE_STATUS_LOCK="tablestatus.lock"; + +}