From issues-return-33062-archive-asf-public=cust-asf.ponee.io@carbondata.apache.org Tue Jan 30 08:08:06 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 651A118061A for ; Tue, 30 Jan 2018 08:08:06 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 52265160C53; Tue, 30 Jan 2018 07:08:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 99487160C42 for ; Tue, 30 Jan 2018 08:08:05 +0100 (CET) Received: (qmail 71676 invoked by uid 500); 30 Jan 2018 07:08:04 -0000 Mailing-List: contact issues-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list issues@carbondata.apache.org Received: (qmail 71666 invoked by uid 99); 30 Jan 2018 07:08:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Jan 2018 07:08:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9D9EDE07F0; Tue, 30 Jan 2018 07:08:04 +0000 (UTC) From: QiangCai To: issues@carbondata.apache.org Reply-To: issues@carbondata.apache.org References: In-Reply-To: Subject: [GitHub] carbondata pull request #1876: [CARBONDATA-2093] Use small file feature of g... Content-Type: text/plain Message-Id: <20180130070804.9D9EDE07F0@git1-us-west.apache.org> Date: Tue, 30 Jan 2018 07:08:04 +0000 (UTC) Github user QiangCai commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1876#discussion_r164655879 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala --- @@ -414,6 +416,75 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte sql("select * from casesensitivepartition where empno=17")) } + test("Partition LOAD with small files") { + sql("DROP TABLE IF EXISTS smallpartitionfiles") + sql( + """ + | CREATE TABLE smallpartitionfiles(id INT, name STRING, age INT) PARTITIONED BY(city STRING) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + val inputPath = new File("target/small_files").getCanonicalPath + val folder = new File(inputPath) + if (folder.exists()) { + FileUtils.deleteDirectory(folder) + } + folder.mkdir() + for (i <- 0 to 100) { + val file = s"$folder/file$i.csv" + val writer = new FileWriter(file) + writer.write("id,name,city,age\n") + writer.write(s"$i,name_$i,city_${i % 5},${ i % 100 }") + writer.close() + } + sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE smallpartitionfiles") + FileUtils.deleteDirectory(folder) + val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "smallpartitionfiles") + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) + val segmentDir = carbonTablePath.getSegmentDir("0", "0") + assert(new File(segmentDir).listFiles().length < 50) + } + + test("verify partition read with small files") { + try { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES) + sql("DROP TABLE IF EXISTS smallpartitionfilesread") + sql( + """ + | CREATE TABLE smallpartitionfilesread(id INT, name STRING, age INT) PARTITIONED BY + | (city STRING) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + val inputPath = new File("target/small_files").getCanonicalPath + val folder = new File(inputPath) + if (folder.exists()) { + FileUtils.deleteDirectory(folder) + } + folder.mkdir() + for (i <- 0 until 100) { + val file = s"$folder/file$i.csv" + val writer = new FileWriter(file) + writer.write("id,name,city,age\n") + writer.write(s"$i,name_$i,city_${ i },${ i % 100 }") + writer.close() + } + sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE smallpartitionfilesread") + FileUtils.deleteDirectory(folder) + val dataFrame = sql("select * from smallpartitionfilesread") + val scanRdd = dataFrame.queryExecution.sparkPlan.collect { + case b: BatchedDataSourceScanExec if b.rdd.isInstanceOf[CarbonScanRDD] => b.rdd + .asInstanceOf[CarbonScanRDD] + }.head + assert(scanRdd.getPartitions.length < 10) + assertResult(100)(dataFrame.collect().length) --- End diff -- suggest to use dataFrame.count ---