Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 13E29200C59 for ; Mon, 17 Apr 2017 10:06:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1270A160BAB; Mon, 17 Apr 2017 08:06:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 89C36160B9C for ; Mon, 17 Apr 2017 10:06:43 +0200 (CEST) Received: (qmail 49256 invoked by uid 500); 17 Apr 2017 08:06:42 -0000 Mailing-List: contact commits-help@carbondata.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.incubator.apache.org Delivered-To: mailing list commits@carbondata.incubator.apache.org Received: (qmail 49247 invoked by uid 99); 17 Apr 2017 08:06:42 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Apr 2017 08:06:42 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 5C7E7C0040 for ; Mon, 17 Apr 2017 08:06:42 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id EkMltZGJsKrM for ; Mon, 17 Apr 2017 08:06:37 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 929A15FACA for ; Mon, 17 Apr 2017 08:06:35 +0000 (UTC) Received: (qmail 49198 invoked by uid 99); 17 Apr 2017 08:06:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Apr 2017 08:06:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9D50EDFC4A; Mon, 17 Apr 2017 08:06:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gvramana@apache.org To: commits@carbondata.incubator.apache.org Date: Mon, 17 Apr 2017 08:06:34 -0000 Message-Id: <29f414aa8b1c4e829c4654650ca64029@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-carbondata git commit: changed max columns from static value to configurable archived-at: Mon, 17 Apr 2017 08:06:45 -0000 Repository: incubator-carbondata Updated Branches: refs/heads/master a8ed450bf -> 4397d0599 changed max columns from static value to configurable Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/0a09472a Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/0a09472a Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/0a09472a Branch: refs/heads/master Commit: 0a09472a4e710caa3960188b7ab0a405abdc9abc Parents: a8ed450 Author: kunal642 Authored: Sat Apr 15 13:48:17 2017 +0530 Committer: Venkata Ramana G Committed: Mon Apr 17 13:32:06 2017 +0530 ---------------------------------------------------------------------- .../hadoop/test/util/StoreCreator.java | 3 ++ .../TestDataLoadWithColumnsMoreThanSchema.scala | 36 ++++++------- .../dataload/TestLoadDataWithHiveSyntax.scala | 2 +- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 3 ++ .../carbondata/spark/util/CommonUtil.scala | 57 +++++++++++++++++++- .../execution/command/carbonTableSchema.scala | 4 +- .../dataload/SparkDatasourceSuite.scala | 1 - .../util/ExternalColumnDictionaryTestCase.scala | 1 + ...GlobalDictionaryUtilConcurrentTestCase.scala | 12 +++-- .../util/GlobalDictionaryUtilTestCase.scala | 1 + .../execution/command/carbonTableSchema.scala | 5 +- .../processing/csvload/CSVInputFormat.java | 25 ++++++++- .../processing/model/CarbonLoadModel.java | 1 + .../newflow/CarbonDataLoadConfiguration.java | 12 +++++ .../carbondata/processing/StoreCreator.java | 3 ++ .../processing/csvload/CSVInputFormatTest.java | 2 + 16 files changed, 136 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java index 51ce2c5..2997e94 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java @@ -166,6 +166,7 @@ public class StoreCreator { loadModel.setSegmentId("0"); loadModel.setPartitionId("0"); loadModel.setFactTimeStamp(System.currentTimeMillis()); + loadModel.setMaxColumns("10"); executeGraph(loadModel, absoluteTableIdentifier.getStorePath()); @@ -399,6 +400,8 @@ public class StoreCreator { CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE, CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT)); + CSVInputFormat.setNumberOfColumns(configuration, String.valueOf(loadModel.getCsvHeaderColumns().length)); + CSVInputFormat.setMaxColumns(configuration, "10"); TaskAttemptContextImpl hadoopAttemptContext = new TaskAttemptContextImpl(configuration, new TaskAttemptID("", 1, TaskType.MAP, 0, 0)); CSVInputFormat format = new CSVInputFormat(); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala index 9711051..c25e520 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala @@ -84,46 +84,42 @@ class TestDataLoadWithColumnsMoreThanSchema extends QueryTest with BeforeAndAfte } test("test for maxcolumns option value greater than threshold value for maxcolumns") { - sql("DROP TABLE IF EXISTS valid_max_columns_test") - sql("CREATE TABLE valid_max_columns_test (imei string,age int,task bigint,num double,level decimal(10,3),productdate timestamp,mark int,name string)STORED BY 'org.apache.carbondata.format'") - try { + intercept[Exception] { + sql("DROP TABLE IF EXISTS valid_max_columns_test") + sql( + "CREATE TABLE valid_max_columns_test (imei string,age int,task bigint,num double,level decimal(10,3),productdate timestamp,mark int,name string)STORED BY 'org.apache.carbondata.format'") sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/character_carbon.csv' into table valid_max_columns_test options('MAXCOLUMNS'='22000')") - checkAnswer(sql("select count(*) from valid_max_columns_test"), - sql("select count(*) from hive_char_test")) - } catch { - case _: Throwable => assert(false) } } test("test for boundary value for maxcolumns") { - sql("DROP TABLE IF EXISTS boundary_max_columns_test") - sql("CREATE TABLE boundary_max_columns_test (empno string, empname String, designation String, doj String, " + + intercept[Exception] { + sql("DROP TABLE IF EXISTS boundary_max_columns_test") + sql( + "CREATE TABLE boundary_max_columns_test (empno string, empname String, designation " + + "String, doj String, " + "workgroupcategory string, workgroupcategoryname String, deptno string, deptname String, " + "projectcode string, projectjoindate String, projectenddate String,attendance double," + "utilization double,salary double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES" + "('DICTIONARY_EXCLUDE'='empno,empname,designation,doj,workgroupcategory," + "workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate')") - try { - sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' into table boundary_max_columns_test options('MAXCOLUMNS'='14')") - assert(true) - } catch { - case _: Throwable => assert(false) + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' into table boundary_max_columns_test" + + s" options('MAXCOLUMNS'='14')") + } } test("test for maxcolumns value less than columns in 1st line of csv file") { - sql("DROP TABLE IF EXISTS boundary_max_columns_test") - sql("CREATE TABLE boundary_max_columns_test (empno string, empname String, designation String, doj String, " + + intercept[Exception] { + sql("DROP TABLE IF EXISTS boundary_max_columns_test") + sql( + "CREATE TABLE boundary_max_columns_test (empno string, empname String, designation String, doj String, " + "workgroupcategory string, workgroupcategoryname String, deptno string, deptname String, " + "projectcode string, projectjoindate String, projectenddate String,attendance double," + "utilization double,salary double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES" + "('DICTIONARY_EXCLUDE'='empno,empname,designation,doj,workgroupcategory," + "workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate')") - try { sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' into table boundary_max_columns_test options('MAXCOLUMNS'='13')") - assert(true) - } catch { - case _: Throwable => assert(false) } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala index 353db9e..561b0d1 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala @@ -651,7 +651,7 @@ class TestLoadDataWithHiveSyntax extends QueryTest with BeforeAndAfterAll { ) sql( s"LOAD DATA local inpath '$resourcesPath/comment.csv' INTO TABLE comment_test " + - "options('DELIMITER' = ',', 'QUOTECHAR' = '.', 'COMMENTCHAR' = '?','FILEHEADER'='imei,age,task,num,level,productdate,mark,name')" + "options('DELIMITER' = ',', 'QUOTECHAR' = '.', 'COMMENTCHAR' = '?','FILEHEADER'='imei,age,task,num,level,productdate,mark,name', 'maxcolumns'='180')" ) checkAnswer(sql("select imei from comment_test"),Seq(Row("\".carbon"),Row("#?carbon"), Row(""), Row("~carbon,"))) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 350a2ec..49984c9 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -64,6 +64,7 @@ class CarbonMergerRDD[K, V]( sc.setLocalProperty("spark.scheduler.pool", "DDL") sc.setLocalProperty("spark.job.interruptOnCancel", "true") + private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "") var storeLocation: String = null var mergeResult: String = null val hdfsStoreLocation = carbonMergerMapping.hdfsStoreLocation @@ -260,6 +261,8 @@ class CarbonMergerRDD[K, V]( val jobConf: JobConf = new JobConf(new Configuration) val job: Job = new Job(jobConf) val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job) + // initialise query_id for job + job.getConfiguration.set("query.id", queryId) var defaultParallelism = sparkContext.defaultParallelism val result = new java.util.ArrayList[Partition](defaultParallelism) var partitionNo = 0 http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 7592e4e..679a4e7 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -273,6 +273,9 @@ object CommonUtil { CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar) CSVInputFormat.setCSVDelimiter(configuration, carbonLoadModel.getCsvDelimiter) CSVInputFormat.setEscapeCharacter(configuration, carbonLoadModel.getEscapeChar) + CSVInputFormat.setMaxColumns(configuration, carbonLoadModel.getMaxColumns) + CSVInputFormat.setNumberOfColumns(configuration, carbonLoadModel.getCsvHeaderColumns.length + .toString) CSVInputFormat.setHeaderExtractionEnabled(configuration, carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty) CSVInputFormat.setQuoteCharacter(configuration, carbonLoadModel.getQuoteChar) @@ -342,7 +345,59 @@ object CommonUtil { + "the same. Input file : " + csvFile) } } - csvColumns } + + def validateMaxColumns(csvHeaders: Array[String], maxColumns: String): Int = { + /* + User configures both csvheadercolumns, maxcolumns, + if csvheadercolumns >= maxcolumns, give error + if maxcolumns > threashold, give error + User configures csvheadercolumns + if csvheadercolumns >= maxcolumns(default) then maxcolumns = csvheadercolumns+1 + if csvheadercolumns >= threashold, give error + User configures nothing + if csvheadercolumns >= maxcolumns(default) then maxcolumns = csvheadercolumns+1 + if csvheadercolumns >= threashold, give error + */ + val columnCountInSchema = csvHeaders.length + var maxNumberOfColumnsForParsing = 0 + val maxColumnsInt = getMaxColumnValue(maxColumns) + if (maxColumnsInt != null) { + if (columnCountInSchema >= maxColumnsInt) { + sys.error(s"csv headers should be less than the max columns: $maxColumnsInt") + } else if (maxColumnsInt > CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) { + sys.error(s"max columns cannot be greater than the threshold value: ${ + CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING + }") + } else { + maxNumberOfColumnsForParsing = maxColumnsInt + } + } else if (columnCountInSchema >= CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) { + sys.error(s"csv header columns should be less than max threashold: ${ + CSVInputFormat + .THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING + }") + } else if (columnCountInSchema >= CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) { + maxNumberOfColumnsForParsing = columnCountInSchema + 1 + } else { + maxNumberOfColumnsForParsing = CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING + } + maxNumberOfColumnsForParsing + } + + private def getMaxColumnValue(maxColumn: String): Integer = { + if (maxColumn != null) { + try { + maxColumn.toInt + } catch { + case e: Exception => + LOGGER.error(s"Invalid value for max column in load options ${ e.getMessage }") + null + } + } else { + null + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 5a22e9c..15472e5 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -404,7 +404,6 @@ case class LoadTable( validateDateFormat(dateFormat, table) val maxColumns = options.getOrElse("maxcolumns", null) - carbonLoadModel.setMaxColumns(checkDefaultValue(maxColumns, null)) carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\")) carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\"")) carbonLoadModel.setCommentChar(checkDefaultValue(commentchar, "#")) @@ -474,6 +473,9 @@ case class LoadTable( carbonLoadModel.setColDictFilePath(columnDict) carbonLoadModel.setDirectLoad(true) carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel)) + val validatedMaxColumns = CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns, + maxColumns) + carbonLoadModel.setMaxColumns(validatedMaxColumns.toString) GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata if (carbonLoadModel.getUseOnePass) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala index 4c5b241..0b64759 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala @@ -17,7 +17,6 @@ package org.apache.carbondata.integration.spark.testsuite.dataload -import java.io.File import org.apache.spark.sql.common.util.QueryTest import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala index 5a986b7..05b94ee 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala @@ -148,6 +148,7 @@ class ExternalColumnDictionaryTestCase extends QueryTest with BeforeAndAfterAll CarbonCommonConstants.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel)) + carbonLoadModel.setMaxColumns("100") carbonLoadModel } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala index 377bbaa..9e0f851 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala @@ -16,7 +16,6 @@ */ package org.apache.carbondata.spark.util -import java.io.File import java.util.concurrent.{Callable, Executors} import scala.collection.mutable.ListBuffer @@ -64,6 +63,7 @@ class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAft CarbonCommonConstants.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel)) + carbonLoadModel.setMaxColumns("2000") carbonLoadModel } @@ -88,6 +88,7 @@ class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAft "employee")(sqlContext) .asInstanceOf[CarbonRelation] } + def writedummydata(filePath: String, recCount: Int) = { var a: Int = 0 var records: StringBuilder = StringBuilder.newBuilder @@ -98,6 +99,7 @@ class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAft dis.writeBytes(records.toString()) dis.close() } + test("concurrent dictionary generation") { CarbonProperties.getInstance.addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME, "-1") val noOfFiles = 5 @@ -114,8 +116,8 @@ class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAft for (i <- 0 until noOfFiles) { dictGenerators.add(new DictGenerator(loadModels(i))) } - val executorService = Executors.newFixedThreadPool(10); - val results = executorService.invokeAll(dictGenerators); + val executorService = Executors.newFixedThreadPool(10) + val results = executorService.invokeAll(dictGenerators) for (i <- 0 until noOfFiles) { val res = results.get(i).get assert("Pass".equals(res)) @@ -128,7 +130,7 @@ class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAft val carbonTableIdentifier = sampleRelation.tableMeta.carbonTable.getCarbonTableIdentifier val columnIdentifier = sampleRelation.tableMeta.carbonTable.getDimensionByName("employee", "empid").getColumnIdentifier val carbonTablePath = PathFactory.getInstance() - .getCarbonTablePath(sampleRelation.tableMeta.storePath, carbonTableIdentifier); + .getCarbonTablePath(sampleRelation.tableMeta.storePath, carbonTableIdentifier) val dictPath = carbonTablePath.getDictionaryFilePath(columnIdentifier.getColumnId) val dictFile = FileFactory.getCarbonFile(dictPath, FileFactory.getFileType(dictPath)) val offSet = dictFile.getSize @@ -146,11 +148,13 @@ class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAft file.delete() } } + override def afterAll { sql("drop table if exists employee") CarbonProperties.getInstance.addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME, Integer.toString(CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME)) } + class DictGenerator(loadModel: CarbonLoadModel) extends Callable[String] { override def call:String = { var result = "Pass" http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala index 189e694..c4b213f 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala @@ -70,6 +70,7 @@ class GlobalDictionaryUtilTestCase extends QueryTest with BeforeAndAfterAll { CarbonCommonConstants.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel)) + carbonLoadModel.setMaxColumns("2000") carbonLoadModel } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 1451247..c8e0436 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -405,7 +405,6 @@ case class LoadTable( val dateFormat = options.getOrElse("dateformat", null) validateDateFormat(dateFormat, table) val maxColumns = options.getOrElse("maxcolumns", null) - carbonLoadModel.setMaxColumns(checkDefaultValue(maxColumns, null)) carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\")) carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\"")) carbonLoadModel.setCommentChar(checkDefaultValue(commentchar, "#")) @@ -472,8 +471,10 @@ case class LoadTable( carbonLoadModel.setColDictFilePath(columnDict) carbonLoadModel.setDirectLoad(true) carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel)) + val validatedMaxColumns = CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns, + maxColumns) + carbonLoadModel.setMaxColumns(validatedMaxColumns.toString) GlobalDictionaryUtil.updateTableMetadataFunc = LoadTable.updateTableMetadata - if (carbonLoadModel.getUseOnePass) { val colDictFilePath = carbonLoadModel.getColDictFilePath if (colDictFilePath != null) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java index 1f7d403..e252e7f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java +++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java @@ -21,6 +21,9 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; + import com.univocity.parsers.csv.CsvParser; import com.univocity.parsers.csv.CsvParserSettings; import org.apache.hadoop.conf.Configuration; @@ -63,6 +66,14 @@ public class CSVInputFormat extends FileInputFormat createRecordReader(InputSplit inputSplit, @@ -145,6 +156,16 @@ public class CSVInputFormat extends FileInputFormat dataLoadProperties = new HashMap<>(); /** @@ -185,6 +189,14 @@ public class CarbonDataLoadConfiguration { this.taskNo = taskNo; } + public void setMaxColumns(String maxColumns) { + this.maxColumns = maxColumns; + } + + public void setNumberOfColumns(int numberOfColumns) { + this.numberOfColumns = String.valueOf(numberOfColumns); + } + public void setDataLoadProperty(String key, Object value) { dataLoadProperties.put(key, value); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java index 85f8470..87f1190 100644 --- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java +++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java @@ -166,6 +166,7 @@ public class StoreCreator { loadModel.setSegmentId("0"); loadModel.setPartitionId("0"); loadModel.setFactTimeStamp(System.currentTimeMillis()); + loadModel.setMaxColumns("10"); executeGraph(loadModel, absoluteTableIdentifier.getStorePath()); @@ -399,6 +400,8 @@ public class StoreCreator { CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE, CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT)); + CSVInputFormat.setMaxColumns(configuration, "10"); + CSVInputFormat.setNumberOfColumns(configuration, "7"); TaskAttemptContextImpl hadoopAttemptContext = new TaskAttemptContextImpl(configuration, new TaskAttemptID("", 1, TaskType.MAP, 0, 0)); CSVInputFormat format = new CSVInputFormat(); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java b/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java index 66aedb6..676838d 100644 --- a/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java +++ b/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java @@ -150,6 +150,8 @@ public class CSVInputFormatTest extends TestCase { private void prepareConf(Configuration conf) { conf.setBoolean(CSVInputFormat.HEADER_PRESENT, true); + conf.set(CSVInputFormat.MAX_COLUMNS, "10"); + conf.set(CSVInputFormat.NUMBER_OF_COLUMNS, "7"); } private void deleteOutput(File output) {