drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [1/4] drill git commit: DRILL-1774: Update JSON Reader to do single pass reading and better use Jackson's interning. Also improve projection pushdown support.
Date Mon, 01 Dec 2014 05:45:32 GMT
Repository: drill
Updated Branches:
  refs/heads/master 3d836b518 -> a60e1dbbf


http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/UTF8JsonRecordSplitter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/UTF8JsonRecordSplitter.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/UTF8JsonRecordSplitter.java
deleted file mode 100644
index 3dca86e..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/UTF8JsonRecordSplitter.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.vector.complex.fn;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Charsets;
-
-public class UTF8JsonRecordSplitter extends JsonRecordSplitterBase {
-  private final static org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UTF8JsonRecordSplitter.class);
-
-  private final InputStream incoming;
-
-  public UTF8JsonRecordSplitter(InputStream incoming){
-    this.incoming = new BufferedInputStream(incoming);
-  }
-
-  @Override
-  protected void preScan() throws IOException {
-    incoming.mark(MAX_RECORD_SIZE);
-  }
-
-  @Override
-  protected void postScan() throws IOException {
-    incoming.reset();
-  }
-
-  @Override
-  protected int readNext() throws IOException {
-    return incoming.read();
-  }
-
-  @Override
-  protected Reader createReader(long maxBytes) {
-    return new BufferedReader(new InputStreamReader(new DelInputStream(incoming, maxBytes),
Charsets.UTF_8));
-  }
-
-  private class DelInputStream extends InputStream {
-
-    private final InputStream incoming;
-    private final long maxBytes;
-    private long bytes = 0;
-
-    public DelInputStream(InputStream in, long maxBytes) {
-      this.maxBytes = maxBytes;
-      this.incoming = in;
-    }
-
-    @Override
-    public int read() throws IOException {
-      if (bytes >= maxBytes){
-        return -1;
-      }else{
-        bytes++;
-        return incoming.read();
-      }
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    String path = "/Users/hgunes/workspaces/mapr/incubator-drill/yelp_academic_dataset_review.json";
-    InputStream s = new FileInputStream(new File(path));
-    JsonRecordSplitter splitter = new UTF8JsonRecordSplitter(s);
-    int recordCount = 0;
-    Reader record = null;
-    ObjectMapper mapper = new ObjectMapper();
-    try {
-      while ((record = splitter.getNextReader()) != null) {
-        recordCount++;
-        JsonNode node = mapper.readTree(record);
-        out(node.toString());
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-    out("last record: " + recordCount);
-  }
-
-  static void out(Object thing) {
-    System.out.println(thing);
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
index 70e54ff..f9eb2c2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
@@ -56,7 +56,7 @@ public class TestFlatten extends BaseTestQuery {
   @Test
   public void drill1653() throws Exception{
     int rowCount = testSql("select * from (select sum(t.flat.`value`) as sm from (select
id, flatten(kvgen(m)) as flat from cp.`/flatten/missing-map.json`)t) where sm = 10 ");
-    assertEquals(rowCount, 1);
+    assertEquals(1, rowCount);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java
deleted file mode 100644
index 9df6cf0..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.json;
-
-import org.apache.drill.BaseTestQuery;
-import org.junit.Test;
-
-
-public class JsonRecordReader2Test extends BaseTestQuery{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonRecordReader2Test.class);
-
-  @Test
-  public void testComplexJsonInput() throws Exception{
-//  test("select z[0]['orange']  from cp.`jsoninput/input2.json` limit 10");
-    test("select `integer`, x['y'] as x1, x['y'] as x2, z[0], z[0]['orange'], z[1]['pink']
 from cp.`jsoninput/input2.json` limit 10 ");
-//    test("select x from cp.`jsoninput/input2.json`");
-
-//    test("select z[0]  from cp.`jsoninput/input2.json` limit 10");
-  }
-
-  @Test
-  public void testComplexMultipleTimes() throws Exception{
-    for(int i =0 ; i < 5; i++){
-    test("select * from cp.`join/merge_join.json`");
-    }
-  }
-
-  @Test
-  public void trySimpleQueryWithLimit() throws Exception{
-    test("select * from cp.`limit/test1.json` limit 10");
-  }
-
-  @Test// DRILL-1634 : retrieve an element in a nested array in a repeated map.  RepeatedMap
(Repeated List (Repeated varchar))
-  public void testNestedArrayInRepeatedMap() throws Exception {
-    test("select a[0].b[0] from cp.`jsoninput/nestedArray.json`");
-    test("select a[0].b[1] from cp.`jsoninput/nestedArray.json`");
-    test("select a[1].b[1] from cp.`jsoninput/nestedArray.json`");  // index out of the range.
Should return empty list.
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
new file mode 100644
index 0000000..4bc1ee7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.json;
+
+import org.apache.drill.BaseTestQuery;
+import org.junit.Test;
+
+
+public class TestJsonRecordReader extends BaseTestQuery{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestJsonRecordReader.class);
+
+  @Test
+  public void testComplexJsonInput() throws Exception{
+//  test("select z[0]['orange']  from cp.`jsoninput/input2.json` limit 10");
+    test("select `integer`, x['y'] as x1, x['y'] as x2, z[0], z[0]['orange'], z[1]['pink']
 from cp.`jsoninput/input2.json` limit 10 ");
+//    test("select x from cp.`jsoninput/input2.json`");
+
+//    test("select z[0]  from cp.`jsoninput/input2.json` limit 10");
+  }
+
+  @Test
+  public void testComplexMultipleTimes() throws Exception{
+    for(int i =0 ; i < 5; i++){
+    test("select * from cp.`join/merge_join.json`");
+    }
+  }
+
+  @Test
+  public void trySimpleQueryWithLimit() throws Exception{
+    test("select * from cp.`limit/test1.json` limit 10");
+  }
+
+  @Test// DRILL-1634 : retrieve an element in a nested array in a repeated map.  RepeatedMap
(Repeated List (Repeated varchar))
+  public void testNestedArrayInRepeatedMap() throws Exception {
+    test("select a[0].b[0] from cp.`jsoninput/nestedArray.json`");
+    test("select a[0].b[1] from cp.`jsoninput/nestedArray.json`");
+    test("select a[1].b[1] from cp.`jsoninput/nestedArray.json`");  // index out of the range.
Should return empty list.
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
index bf81ba2..61048ee 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
@@ -17,19 +17,19 @@
  */
 package org.apache.drill.exec.vector.complex.writer;
 
-import static org.junit.Assert.*;
-import io.netty.buffer.DrillBuf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
-import java.io.ByteArrayOutputStream;
 import java.util.List;
 
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
-import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -37,20 +37,11 @@ import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.RepeatedBigIntVector;
-import org.apache.drill.exec.vector.complex.MapVector;
-import org.apache.drill.exec.vector.complex.fn.JsonReaderWithState;
-import org.apache.drill.exec.vector.complex.fn.JsonWriter;
-import org.apache.drill.exec.vector.complex.fn.ReaderJSONRecordSplitter;
-import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl;
-import org.apache.drill.exec.vector.complex.reader.FieldReader;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
 public class TestJsonReader extends BaseTestQuery {
@@ -184,6 +175,14 @@ public class TestJsonReader extends BaseTestQuery {
     test("alter system set `store.json.all_text_mode` = false");
   }
 
+  @Test
+  public void ensureProjectionPushdown() throws Exception {
+    // Tests to make sure that we are correctly eliminating schema changing columns.  If
completes, means that the projection pushdown was successful.
+    test("alter system set `store.json.all_text_mode` = false; "
+        + "select  t.field_1, t.field_3.inner_1, t.field_3.inner_2, t.field_4.inner_1 "
+        + "from cp.`store/json/schema_change_int_to_string.json` t");
+  }
+
   // The project pushdown rule is correctly adding the projected columns to the scan, however
it is not removing
   // the redundant project operator after the scan, this tests runs a physical plan generated
from one of the tests to
   // ensure that the project is filtering out the correct data in the scan alone
@@ -202,26 +201,12 @@ public class TestJsonReader extends BaseTestQuery {
     RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
     QueryResultBatch batch = results.get(1);
     assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
-    assertEquals(5, batchLoader.getSchema().getFieldCount());
-    testExistentColumns(batchLoader, batch);
-
-    VectorWrapper vw = batchLoader.getValueAccessorById(
-        NullableIntVector.class, //
-        batchLoader.getValueVectorId(SchemaPath.getCompoundPath("non_existent_at_root")).getFieldIds()
//
-    );
-    assertNull(vw.getValueVector().getAccessor().getObject(0));
-    assertNull(vw.getValueVector().getAccessor().getObject(1));
-    assertNull(vw.getValueVector().getAccessor().getObject(2));
 
-    vw = batchLoader.getValueAccessorById(
-        NullableIntVector.class, //
-        batchLoader.getValueVectorId(SchemaPath.getCompoundPath("non_existent", "nested","field")).getFieldIds()
//
-    );
-    assertNull(vw.getValueVector().getAccessor().getObject(0));
-    assertNull(vw.getValueVector().getAccessor().getObject(1));
-    assertNull(vw.getValueVector().getAccessor().getObject(2));
+    // this used to be five.  It is now three.  This is because the plan doesn't have a project.
+    // Scanners are not responsible for projecting non-existent columns (as long as they
project one column)
+    assertEquals(3, batchLoader.getSchema().getFieldCount());
+    testExistentColumns(batchLoader, batch);
 
-    vw.getValueVector().clear();
     batch.release();
     batchLoader.clear();
   }
@@ -260,113 +245,5 @@ public class TestJsonReader extends BaseTestQuery {
     assertEquals("[4,5,6]", vw.getValueVector().getAccessor().getObject(2).toString());
   }
 
-  @Test
-  public void testReader() throws Exception{
-    final int repeatSize = 10;
-
-    String simple = " { \"b\": \"hello\", \"c\": \"goodbye\"}\n " +
-        "{ \"b\": \"yellow\", \"c\": \"red\", \"p\":" +
-    "{ \"integer\" : 2001, \n" +
-        "  \"float\"   : 1.2,\n" +
-        "  \"x\": {\n" +
-        "    \"y\": \"friends\",\n" +
-        "    \"z\": \"enemies\"\n" +
-        "  },\n" +
-        "  \"z\": [\n" +
-        "    {\"orange\" : \"black\" },\n" +
-        "    {\"pink\" : \"purple\" }\n" +
-        "  ]\n" +
-        "  \n" +
-        "}\n }";
-
-    String compound = simple;
-    for (int i =0; i < repeatSize; i++) {
-      compound += simple;
-    }
-
-//    simple = "{ \"integer\" : 2001, \n" +
-//        "  \"float\"   : 1.2\n" +
-//        "}\n" +
-//        "{ \"integer\" : -2002,\n" +
-//        "  \"float\"   : -1.2 \n" +
-//        "}";
-    MapVector v = new MapVector("", allocator, null);
-    ComplexWriterImpl writer = new ComplexWriterImpl("col", v);
-    writer.allocate();
-
-    DrillBuf buffer = allocator.buffer(255);
-    JsonReaderWithState jsonReader = new JsonReaderWithState(new ReaderJSONRecordSplitter(compound),
buffer,
-        GroupScan.ALL_COLUMNS, false);
-    int i =0;
-    List<Integer> batchSizes = Lists.newArrayList();
-
-    outside: while(true) {
-      writer.setPosition(i);
-      switch (jsonReader.write(writer)) {
-      case WRITE_SUCCEED:
-        i++;
-        break;
-      case NO_MORE:
-        batchSizes.add(i);
-        System.out.println("no more records - main loop");
-        break outside;
-
-      case WRITE_FAILED:
-        System.out.println("==== hit bounds at " + i);
-        //writer.setValueCounts(i - 1);
-        batchSizes.add(i);
-        i = 0;
-        writer.allocate();
-        writer.reset();
-
-        switch(jsonReader.write(writer)) {
-        case NO_MORE:
-          System.out.println("no more records - new alloc loop.");
-          break outside;
-        case WRITE_FAILED:
-          throw new RuntimeException("Failure while trying to write.");
-        case WRITE_SUCCEED:
-          i++;
-        };
-
-      };
-    }
-
-    int total = 0;
-    int lastRecordCount = 0;
-    for (Integer records : batchSizes) {
-      total += records;
-      lastRecordCount = records;
-    }
-
-
-    ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
-
-    ow.writeValueAsString(v.getAccessor().getObject(0));
-    ow.writeValueAsString(v.getAccessor().getObject(1));
-    FieldReader reader = v.get("col", MapVector.class).getAccessor().getReader();
-
-    ByteArrayOutputStream stream = new ByteArrayOutputStream();
-    JsonWriter jsonWriter = new JsonWriter(stream, true);
-
-    reader.setPosition(0);
-    jsonWriter.write(reader);
-    reader.setPosition(1);
-    jsonWriter.write(reader);
-    System.out.print("Json Read: ");
-    System.out.println(new String(stream.toByteArray(), Charsets.UTF_8));
-//    System.out.println(compound);
-
-    System.out.println("Total Records Written " + batchSizes);
-
-    reader.setPosition(lastRecordCount - 2);
-    assertEquals("goodbye", reader.reader("c").readText().toString());
-    reader.setPosition(lastRecordCount - 1);
-    assertEquals("red", reader.reader("c").readText().toString());
-    assertEquals((repeatSize+1) * 2, total);
-
-    writer.clear();
-    buffer.release();
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/test/resources/parquet/null_test_data.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/null_test_data.json b/exec/java-exec/src/test/resources/parquet/null_test_data.json
index 78720d3..a6a5518 100644
--- a/exec/java-exec/src/test/resources/parquet/null_test_data.json
+++ b/exec/java-exec/src/test/resources/parquet/null_test_data.json
@@ -1,26 +1,8 @@
-{
-"int_col" : 1,
-"var_len" : "a string"
-},
-{
-"int_col" : 2
-},
-{
-"int_col" : 3
-},
-{
-"int_col" : 4
-},
-{
-"int_col" : 5
-},
-{
-"var_len" : "a longer string"
-},
-{
-"var_len" : "an even longer string"
-}
-{
-"int_col" : -1,
-"var_len" : "the end"
-}
+{ "int_col" : 1, "var_len" : "a string"}
+{ "int_col" : 2 }
+{ "int_col" : 3 }
+{ "int_col" : 4 }
+{ "int_col" : 5 }
+{ "var_len" : "a longer string" }
+{ "var_len" : "an even longer string" }
+{ "int_col" : -1, "var_len" : "the end" }

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/test/resources/store/json/test_flatten_mapify.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/json/test_flatten_mapify.json b/exec/java-exec/src/test/resources/store/json/test_flatten_mapify.json
index 7c7fb93..c767ae5 100644
--- a/exec/java-exec/src/test/resources/store/json/test_flatten_mapify.json
+++ b/exec/java-exec/src/test/resources/store/json/test_flatten_mapify.json
@@ -22,5 +22,6 @@
    },
    "x" : 2
 }
+{
    "x" : 1
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/exec/jdbc/pom.xml b/exec/jdbc/pom.xml
index c57f18b..6076326 100644
--- a/exec/jdbc/pom.xml
+++ b/exec/jdbc/pom.xml
@@ -61,6 +61,16 @@
       <classifier>tests</classifier>
     </dependency>
     <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+      <version>2.4.3</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>2.4.3</version>
+    </dependency>    
+    <dependency>
       <groupId>pentaho</groupId>
       <artifactId>mondrian-data-foodmart-queries</artifactId>
       <version>0.3</version>


Mime
View raw message