drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [1/2] drill git commit: DRILL-1830: Fix reading map type in parquet reader
Date Tue, 09 Dec 2014 17:35:45 GMT
Repository: drill
Updated Branches:
  refs/heads/0.7.0 503a5b2a4 -> 201280e6e
  refs/heads/master bd3e9138a -> 2396670ed


DRILL-1830: Fix reading map type in parquet reader


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/201280e6
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/201280e6
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/201280e6

Branch: refs/heads/0.7.0
Commit: 201280e6ed09c7852dbe77c637fbe85271a224e1
Parents: 503a5b2
Author: Steven Phillips <sphillips@maprtech.com>
Authored: Tue Dec 9 03:00:02 2014 -0800
Committer: Steven Phillips <sphillips@maprtech.com>
Committed: Tue Dec 9 04:37:15 2014 -0800

----------------------------------------------------------------------
 .../exec/store/parquet2/DrillParquetReader.java | 84 ++++++++++----------
 .../exec/store/parquet/TestParquetComplex.java  | 33 ++++++++
 .../store/parquet/complex/baseline5.json        | 26 ++++++
 .../store/parquet/complex/baseline6.json        | 36 +++++++++
 .../store/parquet/complex/baseline7.json        | 16 ++++
 5 files changed, 154 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/201280e6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index dec5222..cadd8cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -130,54 +130,56 @@ public class DrillParquetReader extends AbstractRecordReader {
     // parquet type.union() seems to lose ConvertedType info when merging two columns that
are the same type. This can
     // happen when selecting two elements from an array. So to work around this, we use set
of SchemaPath to avoid duplicates
     // and then merge the types at the end
-    Set<SchemaPath> schemaPaths = Sets.newLinkedHashSet(); // Use LinkedHashSet to
preserver ordering, otherwise DrillParquetGroupConverter breaks
+    Set<SchemaPath> selectedSchemaPaths = Sets.newLinkedHashSet();
 
+    // get a list of modified columns which have the array elements removed from the schema
path since parquet schema doesn't include array elements
+    List<SchemaPath> modifiedColumns = Lists.newLinkedList();
     for (SchemaPath path : columns) {
-      boolean colNotFound=true;
-      schemaColumnLoop: for (ColumnDescriptor colDesc: schemaColumns) {
-        String[] schemaColDesc = Arrays.copyOf(colDesc.getPath(), colDesc.getPath().length);
-        SchemaPath schemaPath = SchemaPath.getCompoundPath(schemaColDesc);
-        PathSegment schemaSeg = schemaPath.getRootSegment();
-        PathSegment colSeg = path.getRootSegment();
-        List<String> segments = Lists.newArrayList();
-        while(schemaSeg != null || colSeg != null){
-          // if one is null but not the other, this is not a match
-          if (schemaSeg == null || colSeg == null) {
-            continue schemaColumnLoop;
-          }
-          if (colSeg.isNamed()) {
-            // DRILL-1739 - Use case insensitive name comparison
-            if(schemaSeg.getNameSegment().getPath().equalsIgnoreCase(colSeg.getNameSegment().getPath()))
{
-              segments.add(schemaSeg.getNameSegment().getPath());
-            }else{
-              continue schemaColumnLoop;
-            }
-          }else{
-            colSeg=colSeg.getChild();
-            continue;
-          }
-          while ((colSeg = colSeg.getChild()) != null && colSeg.isArray()) {
-            colSeg = colSeg.getChild();
-            if (colSeg == null) {
-              break;
-            }
-          }
-          schemaSeg = schemaSeg.getChild();
+      List<String> segments = Lists.newArrayList();
+      PathSegment seg = path.getRootSegment();
+      do {
+        if (seg.isNamed()) {
+          segments.add(seg.getNameSegment().getPath());
         }
-        // Field exists in schema
-        if (!segments.isEmpty()) {
-          String[] pathSegments = new String[segments.size()];
-          segments.toArray(pathSegments);
-          colNotFound=false;
-          schemaPaths.add(schemaPath);
-          break;
+      } while ((seg = seg.getChild()) != null);
+      String[] pathSegments = new String[segments.size()];
+      segments.toArray(pathSegments);
+      SchemaPath modifiedSchemaPath = SchemaPath.getCompoundPath(pathSegments);
+      modifiedColumns.add(modifiedSchemaPath);
+    }
+
+    // convert the columns in the parquet schema to a list of SchemaPath columns so that
they can be compared in case insensitive manner
+    // to the projection columns
+    List<SchemaPath> schemaPaths = Lists.newLinkedList();
+    for (ColumnDescriptor columnDescriptor : schemaColumns) {
+      String[] schemaColDesc = Arrays.copyOf(columnDescriptor.getPath(), columnDescriptor.getPath().length);
+      SchemaPath schemaPath = SchemaPath.getCompoundPath(schemaColDesc);
+      schemaPaths.add(schemaPath);
+    }
+
+    // loop through columns in parquet schema and add columns that are included in project
list
+    outer: for (SchemaPath schemaPath : schemaPaths) {
+      for (SchemaPath columnPath : columns) {
+        if (columnPath.contains(schemaPath)) {
+          selectedSchemaPaths.add(schemaPath);
+          continue outer;
         }
       }
-      if(colNotFound){
-        columnsNotFound.add(path);
+    }
+
+    // loop through projection columns and add any columns that are missing from parquet
schema to columnsNotFound list
+    outer: for (SchemaPath columnPath : modifiedColumns) {
+      for (SchemaPath schemaPath : schemaPaths) {
+        if (schemaPath.contains(columnPath)) {
+          continue outer;
+        }
       }
+      columnsNotFound.add(columnPath);
     }
-    for (SchemaPath schemaPath : schemaPaths) {
+
+
+    // convert SchemaPaths from selectedSchemaPaths and convert to parquet type, and merge
into projection schema
+    for (SchemaPath schemaPath : selectedSchemaPaths) {
       List<String> segments = Lists.newArrayList();
       PathSegment seg = schemaPath.getRootSegment();
       do {

http://git-wip-us.apache.org/repos/asf/drill/blob/201280e6/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
index 9919d3b..892d453 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
@@ -25,6 +25,39 @@ public class TestParquetComplex extends BaseTestQuery {
   private static final String DATAFILE = "cp.`store/parquet/complex/complex.parquet`";
 
   @Test
+  public void selectMap() throws Exception {
+    String query = "select marketing_info from cp.`store/parquet/complex/complex.parquet`";
+    testBuilder()
+            .sqlQuery(query)
+            .ordered()
+            .jsonBaselineFile("store/parquet/complex/baseline5.json")
+            .build()
+            .run();
+  }
+
+  @Test
+  public void selectMapAndElements() throws Exception {
+    String query = "select marketing_info, t.marketing_info.camp_id as camp_id, t.marketing_info.keywords[2]
as keyword2 from cp.`store/parquet/complex/complex.parquet` t";
+    testBuilder()
+            .sqlQuery(query)
+            .ordered()
+            .jsonBaselineFile("store/parquet/complex/baseline6.json")
+            .build()
+            .run();
+  }
+
+  @Test
+  public void selectMultiElements() throws Exception {
+    String query = "select t.marketing_info.camp_id as camp_id, t.marketing_info.keywords
as keywords from cp.`store/parquet/complex/complex.parquet` t";
+    testBuilder()
+            .sqlQuery(query)
+            .ordered()
+            .jsonBaselineFile("store/parquet/complex/baseline7.json")
+            .build()
+            .run();
+  }
+
+  @Test
   public void testStar() throws Exception {
     testBuilder()
             .sqlQuery("select * from cp.`store/parquet/complex/complex.parquet`")

http://git-wip-us.apache.org/repos/asf/drill/blob/201280e6/exec/java-exec/src/test/resources/store/parquet/complex/baseline5.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/parquet/complex/baseline5.json b/exec/java-exec/src/test/resources/store/parquet/complex/baseline5.json
new file mode 100644
index 0000000..c9485e6
--- /dev/null
+++ b/exec/java-exec/src/test/resources/store/parquet/complex/baseline5.json
@@ -0,0 +1,26 @@
+{
+  "marketing_info" : {
+    "camp_id" : 4,
+    "keywords" : [ "go", "to", "thing", "watch", "made", "laughing", "might", "pay", "in",
"your", "hold" ]
+  }
+} {
+  "marketing_info" : {
+    "camp_id" : 6,
+    "keywords" : [ "pronounce", "tree", "instead", "games", "sigh" ]
+  }
+} {
+  "marketing_info" : {
+    "camp_id" : 17,
+    "keywords" : [ ]
+  }
+} {
+  "marketing_info" : {
+    "camp_id" : 17,
+    "keywords" : [ "it's" ]
+  }
+} {
+  "marketing_info" : {
+    "camp_id" : 8,
+    "keywords" : [ "fallout" ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/201280e6/exec/java-exec/src/test/resources/store/parquet/complex/baseline6.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/parquet/complex/baseline6.json b/exec/java-exec/src/test/resources/store/parquet/complex/baseline6.json
new file mode 100644
index 0000000..abbf15d
--- /dev/null
+++ b/exec/java-exec/src/test/resources/store/parquet/complex/baseline6.json
@@ -0,0 +1,36 @@
+{
+  "marketing_info" : {
+    "camp_id" : 4,
+    "keywords" : [ "go", "to", "thing", "watch", "made", "laughing", "might", "pay", "in",
"your", "hold" ]
+  },
+  "camp_id" : 4,
+  "keyword2" : "thing"
+} {
+  "marketing_info" : {
+    "camp_id" : 6,
+    "keywords" : [ "pronounce", "tree", "instead", "games", "sigh" ]
+  },
+  "camp_id" : 6,
+  "keyword2" : "instead"
+} {
+  "marketing_info" : {
+    "camp_id" : 17,
+    "keywords" : [ ]
+  },
+  "camp_id" : 17,
+  "keyword2" : null
+} {
+  "marketing_info" : {
+    "camp_id" : 17,
+    "keywords" : [ "it's" ]
+  },
+  "camp_id" : 17,
+  "keyword2" : null
+} {
+  "marketing_info" : {
+    "camp_id" : 8,
+    "keywords" : [ "fallout" ]
+  },
+  "camp_id" : 8,
+  "keyword2" : null
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/201280e6/exec/java-exec/src/test/resources/store/parquet/complex/baseline7.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/parquet/complex/baseline7.json b/exec/java-exec/src/test/resources/store/parquet/complex/baseline7.json
new file mode 100644
index 0000000..5e89dd7
--- /dev/null
+++ b/exec/java-exec/src/test/resources/store/parquet/complex/baseline7.json
@@ -0,0 +1,16 @@
+{
+  "camp_id" : 4,
+  "keywords" : [ "go", "to", "thing", "watch", "made", "laughing", "might", "pay", "in",
"your", "hold" ]
+} {
+  "camp_id" : 6,
+  "keywords" : [ "pronounce", "tree", "instead", "games", "sigh" ]
+} {
+  "camp_id" : 17,
+  "keywords" : [ ]
+} {
+  "camp_id" : 17,
+  "keywords" : [ "it's" ]
+} {
+  "camp_id" : 8,
+  "keywords" : [ "fallout" ]
+}
\ No newline at end of file


Mime
View raw message