carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-2888] Support multi level subfolder for SDK read and fileformat read
Date Wed, 05 Sep 2018 06:35:27 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 21a72bf2e -> 70fe5144d


[CARBONDATA-2888] Support multi level subfolder for SDK read and fileformat read

This PR supports multi-level subfolders read for SDK Reader and spark's carbon fileformat
reader.

This closes #2661


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/70fe5144
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/70fe5144
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/70fe5144

Branch: refs/heads/master
Commit: 70fe5144dcdb338c03b1355a2f5a36172713f89a
Parents: 21a72bf
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Sun Aug 26 22:47:14 2018 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Wed Sep 5 14:35:13 2018 +0800

----------------------------------------------------------------------
 .../blockletindex/SegmentIndexFileStore.java    | 18 +++++++++
 .../core/metadata/schema/table/CarbonTable.java | 29 +++++++-------
 .../LatestFilesReadCommittedScope.java          |  4 +-
 .../CarbonFileIndexReplaceRule.scala            | 30 +++++++++++++--
 .../datasource/SparkCarbonDataSourceTest.scala  | 22 +++++++++++
 .../carbondata/sdk/file/CarbonReaderTest.java   | 40 ++++++++++++++++++++
 6 files changed, 124 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/70fe5144/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index c4e7f7a..25cfc26 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -341,6 +341,24 @@ public class SegmentIndexFileStore {
   /**
    * List all the index files of the segment.
    *
+   * @param carbonFile directory
+   */
+  public static void getCarbonIndexFilesRecursively(CarbonFile carbonFile,
+      List<CarbonFile> indexFiles) {
+    CarbonFile[] carbonFiles = carbonFile.listFiles();
+    for (CarbonFile file : carbonFiles) {
+      if (file.isDirectory()) {
+        getCarbonIndexFilesRecursively(file, indexFiles);
+      } else if ((file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
+          .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && file.getSize() >
0) {
+        indexFiles.add(file);
+      }
+    }
+  }
+
+  /**
+   * List all the index files of the segment.
+   *
    * @param segmentPath
    * @return
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/70fe5144/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 14052f8..c66d168 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -36,7 +36,6 @@ import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -239,22 +238,12 @@ public class CarbonTable implements Serializable {
       String tablePath,
       String tableName) throws IOException {
     TableInfo tableInfoInfer = CarbonUtil.buildDummyTableInfo(tablePath, "null", "null");
-    CarbonFile[] carbonFiles = FileFactory
-        .getCarbonFile(tablePath)
-        .listFiles(new CarbonFileFilter() {
-          @Override
-          public boolean accept(CarbonFile file) {
-            if (file == null) {
-              return false;
-            }
-            return file.getName().endsWith("carbonindex");
-          }
-        });
-    if (carbonFiles == null || carbonFiles.length < 1) {
+    CarbonFile carbonFile = getFirstIndexFile(FileFactory.getCarbonFile(tablePath));
+    if (carbonFile == null) {
       throw new RuntimeException("Carbon index file not exists.");
     }
     org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil
-        .inferSchemaFromIndexFile(carbonFiles[0].getPath(), tableName);
+        .inferSchemaFromIndexFile(carbonFile.getPath(), tableName);
     List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
     for (org.apache.carbondata.format.ColumnSchema thriftColumnSchema : tableInfo
         .getFact_table().getTable_columns()) {
@@ -268,6 +257,18 @@ public class CarbonTable implements Serializable {
     return CarbonTable.buildFromTableInfo(tableInfoInfer);
   }
 
+  private static CarbonFile getFirstIndexFile(CarbonFile tablePath) {
+    CarbonFile[] carbonFiles = tablePath.listFiles();
+    for (CarbonFile carbonFile : carbonFiles) {
+      if (carbonFile.isDirectory()) {
+        return getFirstIndexFile(carbonFile);
+      } else if (carbonFile.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+        return carbonFile;
+      }
+    }
+    return null;
+  }
+
   public static CarbonTable buildDummyTable(String tablePath) throws IOException {
     TableInfo tableInfoInfer = CarbonUtil.buildDummyTableInfo(tablePath, "null", "null");
     return CarbonTable.buildFromTableInfo(tableInfoInfer);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/70fe5144/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
index 7dd061e..abd9c2c 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -175,7 +175,9 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope
{
     CarbonFile[] carbonIndexFiles = null;
     if (file.isDirectory()) {
       if (segmentId == null) {
-        carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(file);
+        List<CarbonFile> indexFiles = new ArrayList<>();
+        SegmentIndexFileStore.getCarbonIndexFilesRecursively(file, indexFiles);
+        carbonIndexFiles = indexFiles.toArray(new CarbonFile[0]);
       } else {
         String segmentPath = CarbonTablePath.getSegmentPath(carbonFilePath, segmentId);
         carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/70fe5144/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala
b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala
index f86200a..b0d9c07 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala
@@ -16,12 +16,15 @@
  */
 package org.apache.spark.sql.carbondata.execution.datasources
 
+import scala.collection.mutable.ArrayBuffer
+
 import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, InMemoryFileIndex,
InsertIntoHadoopFsRelationCommand, LogicalRelation}
 import org.apache.spark.sql.sources.BaseRelation
 
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -67,10 +70,10 @@ class CarbonFileIndexReplaceRule extends Rule[LogicalPlan] {
       hadoopFsRelation: HadoopFsRelation): FileIndex = {
     if (fileIndex.isInstanceOf[InMemoryFileIndex] && fileIndex.rootPaths.length ==
1) {
       val carbonFile = FileFactory.getCarbonFile(fileIndex.rootPaths.head.toUri.toString)
-      val carbonFiles = carbonFile.listFiles()
-      if (carbonFiles.nonEmpty &&
-          !carbonFiles.exists(_.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT))) {
-        val paths = carbonFiles.map(p => new Path(p.getAbsolutePath)).toSeq
+      val dataFolders = new ArrayBuffer[CarbonFile]()
+      getDataFolders(carbonFile, dataFolders)
+      if (dataFolders.nonEmpty && dataFolders.length > 1) {
+        val paths = dataFolders.map(p => new Path(p.getAbsolutePath))
         new InMemoryFileIndex(hadoopFsRelation.sparkSession,
           paths,
           hadoopFsRelation.options,
@@ -82,4 +85,23 @@ class CarbonFileIndexReplaceRule extends Rule[LogicalPlan] {
       fileIndex
     }
   }
+
+  /**
+   * Get datafolders recursively
+   */
+  private def getDataFolders(
+      tableFolder: CarbonFile,
+      dataFolders: ArrayBuffer[CarbonFile]): Unit = {
+    val files = tableFolder.listFiles()
+    files.foreach { f =>
+      if (f.isDirectory) {
+        val files = f.listFiles()
+        if (files.nonEmpty && !files(0).isDirectory) {
+          dataFolders += f
+        } else {
+          getDataFolders(f, dataFolders)
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/70fe5144/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index 7e51125..837bc4f 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -590,7 +590,29 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll
{
     }
   }
 
+  test("test write using multi subfolder") {
+    if (!spark.sparkContext.version.startsWith("2.1")) {
+      FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+      import spark.implicits._
+      val df = spark.sparkContext.parallelize(1 to 10)
+        .map(x => ("a" + x % 10, "b", x))
+        .toDF("c1", "c2", "number")
+
+      // Saves dataframe to carbon file
+      df.write.format("carbon").save(warehouse1 + "/test_folder/1/" + System.nanoTime())
+      df.write.format("carbon").save(warehouse1 + "/test_folder/2/" + System.nanoTime())
+      df.write.format("carbon").save(warehouse1 + "/test_folder/3/" + System.nanoTime())
 
+      val frame = spark.read.format("carbon").load(warehouse1 + "/test_folder")
+      assert(frame.count() == 30)
+      assert(frame.where("c1='a1'").count() == 3)
+      val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
+      DataMapStoreManager.getInstance()
+        .clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/test_folder"))
+      assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
+      FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
+    }
+  }
   override protected def beforeAll(): Unit = {
     drop
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/70fe5144/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index 020bd06..54b3e9e 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -1665,4 +1665,44 @@ public class CarbonReaderTest extends TestCase {
     Assert.assertEquals(i, 100);
   }
 
+  @Test
+  public void testReadWithFilterOfnonTransactionalwithsubfolders() throws IOException, InterruptedException
{
+    String path1 = "./testWriteFiles/1/"+System.nanoTime();
+    String path2 = "./testWriteFiles/2/"+System.nanoTime();
+    String path3 = "./testWriteFiles/3/"+System.nanoTime();
+    FileUtils.deleteDirectory(new File("./testWriteFiles"));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    TestUtil.writeFilesAndVerify(200, new Schema(fields), path1, false, false);
+    TestUtil.writeFilesAndVerify(200, new Schema(fields), path2, false, false);
+    TestUtil.writeFilesAndVerify(200, new Schema(fields), path3, false, false);
+
+    EqualToExpression equalToExpression = new EqualToExpression(
+        new ColumnExpression("name", DataTypes.STRING),
+        new LiteralExpression("robot1", DataTypes.STRING));
+    CarbonReader reader = CarbonReader
+        .builder("./testWriteFiles", "_temp")
+        .isTransactionalTable(false)
+        .projection(new String[]{"name", "age"})
+        .filter(equalToExpression)
+        .build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      // Default sort column is applied for dimensions. So, need  to validate accordingly
+      assert ("robot1".equals(row[0]));
+      i++;
+    }
+    Assert.assertEquals(i, 60);
+
+    reader.close();
+
+    FileUtils.deleteDirectory(new File("./testWriteFiles"));
+  }
+
+
 }


Mime
View raw message