drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [2/4] drill git commit: DRILL-1774: Update JSON Reader to do single pass reading and better use Jackson's interning. Also improve projection pushdown support.
Date Mon, 01 Dec 2014 05:45:33 GMT
DRILL-1774: Update JSON Reader to do single pass reading and better use Jackson's interning.  Also improve projection pushdown support.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b218ec03
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b218ec03
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b218ec03

Branch: refs/heads/master
Commit: b218ec038603785fe8ac7449e6e175eed8dd5dfa
Parents: ec486fc
Author: Jacques Nadeau <jacques@apache.org>
Authored: Sun Nov 9 22:35:39 2014 -0800
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Sun Nov 30 20:02:33 2014 -0800

----------------------------------------------------------------------
 common/pom.xml                                  |  13 +-
 .../exec/store/mongo/MongoRecordReader.java     |  44 +-
 exec/java-exec/pom.xml                          |   2 +-
 .../codegen/templates/AbstractFieldWriter.java  |  11 +-
 .../main/codegen/templates/ComplexWriters.java  |  35 +-
 .../codegen/templates/NullableValueVectors.java |   4 +
 .../codegen/templates/RepeatedValueVectors.java |  11 +
 .../jackson/core/JsonStreamContextExposer.java  |  29 ++
 .../core/json/JsonReadContextExposer.java       |  39 ++
 .../exec/expr/fn/impl/conv/JsonConvertFrom.java |  31 +-
 .../exec/store/easy/json/JSONFormatPlugin.java  |   2 +-
 .../exec/store/easy/json/JSONRecordReader.java  | 149 ++++++
 .../exec/store/easy/json/JSONRecordReader2.java | 154 -------
 .../store/easy/json/RewindableUtf8Reader.java   | 157 +++++++
 .../vector/complex/fn/DrillBufInputStream.java  |  59 +++
 .../exec/vector/complex/fn/FieldSelection.java  | 154 +++++++
 .../exec/vector/complex/fn/JsonReader.java      | 453 +++++++++++--------
 .../vector/complex/fn/JsonReaderWithState.java  |  98 ----
 .../vector/complex/fn/JsonRecordSplitter.java   |  27 --
 .../complex/fn/JsonRecordSplitterBase.java      | 123 -----
 .../complex/fn/ReaderJSONRecordSplitter.java    | 106 -----
 .../exec/vector/complex/fn/SeekableBAIS.java    |  63 +++
 .../complex/fn/UTF8JsonRecordSplitter.java      | 106 -----
 .../exec/physical/impl/flatten/TestFlatten.java |   2 +-
 .../exec/store/json/JsonRecordReader2Test.java  |  54 ---
 .../exec/store/json/TestJsonRecordReader.java   |  54 +++
 .../vector/complex/writer/TestJsonReader.java   | 155 +------
 .../test/resources/parquet/null_test_data.json  |  34 +-
 .../store/json/test_flatten_mapify.json         |   1 +
 exec/jdbc/pom.xml                               |  10 +
 30 files changed, 1091 insertions(+), 1089 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index ffe6efe..93beb4c 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -74,9 +74,15 @@
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-annotations</artifactId>
-      <version>2.2.0</version>
+      <version>2.4.3</version>
     </dependency>
     <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>2.4.3</version>
+    </dependency>
+    
+    <dependency>
       <groupId>org.hibernate</groupId>
       <artifactId>hibernate-validator</artifactId>
       <version>4.3.1.Final</version>
@@ -88,11 +94,6 @@
       </exclusions>
     </dependency>
     <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-      <version>2.2.0</version>
-    </dependency>
-    <dependency>
       <groupId>org.antlr</groupId>
       <artifactId>antlr-runtime</artifactId>
       <version>3.4</version>

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index ad4e119..95c4b64 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -42,7 +42,7 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.complex.fn.JsonReaderWithState;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter;
 import org.slf4j.Logger;
@@ -72,7 +72,7 @@ public class MongoRecordReader extends AbstractRecordReader {
 
   private NullableVarCharVector valueVector;
 
-  private JsonReaderWithState jsonReaderWithState;
+  private JsonReader jsonReader;
   private VectorContainerWriter writer;
   private List<SchemaPath> columns;
 
@@ -172,14 +172,8 @@ public class MongoRecordReader extends AbstractRecordReader {
         throw new ExecutionSetupException(e);
       }
     } else {
-      try {
-        this.writer = new VectorContainerWriter(output);
-        this.jsonReaderWithState = new JsonReaderWithState(
-            fragmentContext.getManagedBuffer(), columns, enableAllTextMode);
-      } catch (IOException e) {
-        throw new ExecutionSetupException(
-            "Failure in Mongo JsonReader initialization.", e);
-      }
+      this.writer = new VectorContainerWriter(output);
+      this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), columns, enableAllTextMode);
     }
     logger.info("Filters Applied : " + filters);
     logger.info("Fields Selected :" + fields);
@@ -200,40 +194,18 @@ public class MongoRecordReader extends AbstractRecordReader {
       done: for (; rowCount < TARGET_RECORD_COUNT && cursor.hasNext(); rowCount++) {
         writer.setPosition(docCount);
         String doc = cursor.next().toString();
-        byte[] record = doc.getBytes(Charsets.UTF_8);
-        switch (jsonReaderWithState.write(record, writer)) {
-        case WRITE_SUCCEED:
+        jsonReader.setSource(doc.getBytes(Charsets.UTF_8));
+        if(jsonReader.write(writer)) {
           docCount++;
           break;
-
-        case WRITE_FAILED:
+        }else{
           if (docCount == 0) {
             throw new DrillRuntimeException(errMsg);
           }
-          logger.warn(errMsg, doc);
-          break done;
-
-        default:
-          break done;
         }
       }
 
-      for (SchemaPath sp : jsonReaderWithState.getNullColumns()) {
-        PathSegment root = sp.getRootSegment();
-        BaseWriter.MapWriter fieldWriter = writer.rootAsMap();
-        if (root.getChild() != null && !root.getChild().isArray()) {
-          fieldWriter = fieldWriter.map(root.getNameSegment().getPath());
-          while (root.getChild().getChild() != null
-              && !root.getChild().isArray()) {
-            fieldWriter = fieldWriter.map(root.getChild().getNameSegment()
-                .getPath());
-            root = root.getChild();
-          }
-          fieldWriter.integer(root.getChild().getNameSegment().getPath());
-        } else {
-          fieldWriter.integer(root.getNameSegment().getPath());
-        }
-      }
+      jsonReader.ensureAtLeastOneField(writer);
 
       writer.setValueCount(docCount);
       logger.debug("Took {} ms to get {} records",

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 9fd67b1..029c8d8 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -94,7 +94,7 @@
     <dependency>
       <groupId>com.fasterxml.jackson.jaxrs</groupId>
       <artifactId>jackson-jaxrs-json-provider</artifactId>
-      <version>2.2.0</version>
+      <version>2.4.3</version>
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java b/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java
index 2f90a1a..145130b 100644
--- a/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java
@@ -41,12 +41,21 @@ abstract class AbstractFieldWriter extends AbstractBaseWriter implements FieldWr
     throw new IllegalStateException(String.format("You tried to end when you are using a ValueWriter of type %s.", this.getClass().getSimpleName()));
   }
   
-  <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> 
+  <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+  <#assign fields = minor.fields!type.fields />
   public void write(${name}Holder holder){
     fail("${name}");
   }
   
+  public void write${minor.class}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>){
+    fail("${name}");
+  }
+  
   </#list></#list>
+
+  public void writeNull(){
+    fail("${name}");
+  }
   
   public MapWriter map(){
     fail("Map");

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
index c1e6052..5ba1c64 100644
--- a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
+++ b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
@@ -31,7 +31,7 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader;
 <#assign name = mode + minor.class?cap_first />
 <#assign eName = name />
 <#assign javaType = (minor.javaType!type.javaType) />
-
+<#assign fields = minor.fields!type.fields />
 
 <@pp.changeOutputFile name="/org/apache/drill/exec/vector/complex/impl/${eName}WriterImpl.java" />
 <#include "/@includes/license.ftl" />
@@ -99,6 +99,16 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
     }
   }
 
+  <#if !(minor.class == "Decimal9" || minor.class == "Decimal18" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense")>
+  public void write${minor.class}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>){
+    if(ok()){
+      // update to inform(setSafe) once available for all vector types for holders.
+      inform(mutator.addSafe(idx(), <#list fields as field>${field.name}<#if field_has_next>, </#if></#list>));
+      vector.setCurrentValueCount(idx());
+    }
+  }
+  </#if>
+  
   public void setPosition(int idx){
     if (ok()){
       super.setPosition(idx);
@@ -124,7 +134,26 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
       vector.setCurrentValueCount(idx());
     }
   }
+  
+  <#if !(minor.class == "Decimal9" || minor.class == "Decimal18" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense")>
+  public void write${minor.class}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>){
+    if(ok()){
+      // update to inform(setSafe) once available for all vector types for holders.
+      inform(mutator.setSafe(idx(), <#if mode == "Nullable">1, </#if><#list fields as field>${field.name}<#if field_has_next>, </#if></#list>));
+      vector.setCurrentValueCount(idx());
+    }
+  }
 
+  <#if mode == "Nullable">
+  public void writeNull(){
+    if(ok()){
+      inform(mutator.setNull(idx()));
+      vector.setCurrentValueCount(idx());
+    }
+  }
+  </#if>
+  </#if>
+  
   </#if>
 
 }
@@ -138,6 +167,10 @@ package org.apache.drill.exec.vector.complex.writer;
 @SuppressWarnings("unused")
 public interface ${eName}Writer extends BaseWriter{
   public void write(${minor.class}Holder h);
+  
+  <#if !(minor.class == "Decimal9" || minor.class == "Decimal18" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense")>
+  public void write${minor.class}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>);
+  </#if>
 }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
index 3989149..b222024 100644
--- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
@@ -503,6 +503,10 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
       </#if>
     }
 
+    public boolean setNull(int index){
+      return bits.getMutator().setSafe(index, 0);
+    }
+    
     public void setSkipNull(int index, ${minor.class}Holder holder){
       values.getMutator().set(index, holder);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index 3d50b00..d261050 100644
--- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -27,6 +27,7 @@ import org.mortbay.jetty.servlet.Holder;
 <#list vv.types as type>
 <#list type.minor as minor>
 <#assign friendlyType = (minor.friendlyType!minor.boxedType!type.boxedType) />
+<#assign fields = minor.fields!type.fields />
 
 <@pp.changeOutputFile name="/org/apache/drill/exec/vector/Repeated${minor.class}Vector.java" />
 <#include "/@includes/license.ftl" />
@@ -491,6 +492,16 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
       return (b1 && b2);
     }
     
+    <#if (fields?size > 1) && !(minor.class == "Decimal9" || minor.class == "Decimal18" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense")>
+    public boolean addSafe(int arrayIndex, <#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>){
+      if(offsets.getValueCapacity() <= arrayIndex+1) return false;
+      int nextOffset = offsets.getAccessor().get(arrayIndex+1);
+      boolean b1 = values.getMutator().setSafe(nextOffset, <#list fields as field>${field.name}<#if field_has_next>, </#if></#list>);
+      boolean b2 = offsets.getMutator().setSafe(arrayIndex+1, nextOffset+1);
+      return (b1 && b2);
+    }
+    </#if>
+    
     protected void add(int index, ${minor.class}Holder holder){
       int nextOffset = offsets.getAccessor().get(index+1);
       values.getMutator().set(nextOffset, holder);

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/main/java/com/fasterxml/jackson/core/JsonStreamContextExposer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/com/fasterxml/jackson/core/JsonStreamContextExposer.java b/exec/java-exec/src/main/java/com/fasterxml/jackson/core/JsonStreamContextExposer.java
new file mode 100644
index 0000000..5099c79
--- /dev/null
+++ b/exec/java-exec/src/main/java/com/fasterxml/jackson/core/JsonStreamContextExposer.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.fasterxml.jackson.core;
+
+/**
+ * Exposes certain package protected methods in Jackson.
+ */
+public class JsonStreamContextExposer {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonStreamContextExposer.class);
+
+  public static int getType(JsonStreamContext c){
+    return c._type;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/main/java/com/fasterxml/jackson/core/json/JsonReadContextExposer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/com/fasterxml/jackson/core/json/JsonReadContextExposer.java b/exec/java-exec/src/main/java/com/fasterxml/jackson/core/json/JsonReadContextExposer.java
new file mode 100644
index 0000000..820dec8
--- /dev/null
+++ b/exec/java-exec/src/main/java/com/fasterxml/jackson/core/json/JsonReadContextExposer.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.fasterxml.jackson.core.json;
+
+/**
+ * Exposes certain package protected methods in Jackson.
+ */
+public class JsonReadContextExposer {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonReadContextExposer.class);
+
+  public static int getColNmbr(JsonReadContext c){
+    return c._columnNr;
+  }
+
+  public static int getLineNmbr(JsonReadContext c){
+    return c._lineNr;
+  }
+
+
+  public static void reset(JsonReadContext c, int type, int lineNr, int colNr){
+    c.reset(type, lineNr, colNr);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java
index 7d07f6e..eb788f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
 import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
 import org.apache.drill.exec.expr.annotations.Output;
 import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.annotations.Workspace;
 import org.apache.drill.exec.expr.holders.VarBinaryHolder;
 import org.apache.drill.exec.expr.holders.VarCharHolder;
 import org.apache.drill.exec.record.RecordBatch;
@@ -45,27 +46,22 @@ public class JsonConvertFrom {
 
     @Param VarBinaryHolder in;
     @Inject DrillBuf buffer;
+    @Workspace org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
 
     @Output ComplexWriter writer;
 
     public void setup(RecordBatch incoming){
+      jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, false);
     }
 
     public void eval(){
 
-      byte[] buf = new byte[in.end - in.start];
-      in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
-      String input = new String(buf, com.google.common.base.Charsets.UTF_8);
-
       try {
-        org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, false);
-
-        jsonReader.write(new java.io.StringReader(input), writer);
+        jsonReader.setSource(in.start, in.end, in.buffer);
+        jsonReader.write(writer);
         buffer = jsonReader.getWorkBuf();
 
       } catch (Exception e) {
-//        System.out.println("Error while converting from JSON. ");
-//        e.printStackTrace();
         throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
       }
     }
@@ -75,27 +71,22 @@ public class JsonConvertFrom {
   public static class ConvertFromJsonVarchar implements DrillSimpleFunc{
 
     @Param VarCharHolder in;
-    @Output ComplexWriter writer;
     @Inject DrillBuf buffer;
+    @Workspace org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
+
+    @Output ComplexWriter writer;
 
     public void setup(RecordBatch incoming){
+      jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, false);
     }
 
     public void eval(){
-
-      byte[] buf = new byte[in.end - in.start];
-      in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
-      String input = new String(buf, com.google.common.base.Charsets.UTF_8);
-
       try {
-        org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, false);
-
-        jsonReader.write(new java.io.StringReader(input), writer);
+        jsonReader.setSource(in.start, in.end, in.buffer);
+        jsonReader.write(writer);
         buffer = jsonReader.getWorkBuf();
 
       } catch (Exception e) {
-//        System.out.println("Error while converting from JSON. ");
-//        e.printStackTrace();
         throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index e1165a2..3b5c6f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -56,7 +56,7 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
   @Override
   public RecordReader getRecordReader(FragmentContext context, FileWork fileWork,
       List<SchemaPath> columns) throws ExecutionSetupException {
-    return new JSONRecordReader2(context, fileWork.getPath(), this.getFileSystem().getUnderlying(), columns);
+    return new JSONRecordReader(context, fileWork.getPath(), this.getFileSystem().getUnderlying(), columns);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
new file mode 100644
index 0000000..557592b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.google.common.base.Stopwatch;
+
+public class JSONRecordReader extends AbstractRecordReader {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
+
+  private OutputMutator mutator;
+  private VectorContainerWriter writer;
+  private Path hadoopPath;
+  private FileSystem fileSystem;
+  private FSDataInputStream stream;
+  private JsonReader jsonReader;
+  private int recordCount;
+  private FragmentContext fragmentContext;
+  private OperatorContext operatorContext;
+  private List<SchemaPath> columns;
+  private boolean enableAllTextMode;
+
+  public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem,
+                          List<SchemaPath> columns) throws OutOfMemoryException {
+    this.hadoopPath = new Path(inputPath);
+    this.fileSystem = fileSystem;
+    this.fragmentContext = fragmentContext;
+    this.columns = columns;
+    this.enableAllTextMode = fragmentContext.getDrillbitContext().getOptionManager().getOption(ExecConstants.JSON_ALL_TEXT_MODE).bool_val;
+  }
+
+  @Override
+  public void setup(OutputMutator output) throws ExecutionSetupException {
+    try{
+      this.stream = fileSystem.open(hadoopPath);
+      this.writer = new VectorContainerWriter(output);
+      this.mutator = output;
+      this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), columns, enableAllTextMode);
+      this.jsonReader.setSource(stream);
+    }catch(Exception e){
+      handleAndRaise("Failure reading JSON file.", e);
+    }
+  }
+
+  protected void handleAndRaise(String msg, Exception e) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(msg).append(" - Parser was at record: ").append(recordCount+1);
+    if (e instanceof JsonParseException) {
+      JsonParseException ex = JsonParseException.class.cast(e);
+      sb.append(" column: ").append(ex.getLocation().getColumnNr());
+    }
+    throw new DrillRuntimeException(sb.toString(), e);
+  }
+
+
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  public void setOperatorContext(OperatorContext operatorContext) {
+    this.operatorContext = operatorContext;
+  }
+
+  @Override
+  public int next() {
+    writer.allocate();
+    writer.reset();
+
+    recordCount = 0;
+//    Stopwatch p = new Stopwatch().start();
+    try{
+      outside: while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION){
+        writer.setPosition(recordCount);
+        boolean write = jsonReader.write(writer);
+
+        if(write){
+//          logger.debug("Wrote record.");
+          recordCount++;
+        }else{
+//          logger.debug("Exiting.");
+          break outside;
+        }
+
+      }
+
+      jsonReader.ensureAtLeastOneField(writer);
+
+      writer.setValueCount(recordCount);
+//      p.stop();
+//      System.out.println(String.format("Wrote %d records in %dms.", recordCount, p.elapsed(TimeUnit.MILLISECONDS)));
+
+      return recordCount;
+
+    } catch (JsonParseException e) {
+      handleAndRaise("Error parsing JSON.", e);
+    } catch (IOException e) {
+      handleAndRaise("Error reading JSON.", e);
+    }
+    // this is never reached
+    return 0;
+  }
+
+  @Override
+  public void cleanup() {
+    try {
+      stream.close();
+    } catch (IOException e) {
+      logger.warn("Failure while closing stream.", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
deleted file mode 100644
index d1502d4..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.easy.json;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.PathSegment;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.memory.OutOfMemoryException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.vector.BaseValueVector;
-import org.apache.drill.exec.vector.complex.fn.JsonReaderWithState;
-import org.apache.drill.exec.vector.complex.fn.JsonRecordSplitter;
-import org.apache.drill.exec.vector.complex.fn.UTF8JsonRecordSplitter;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-import org.apache.drill.exec.vector.complex.writer.BaseWriter;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.fasterxml.jackson.core.JsonParseException;
-
-public class JSONRecordReader2 extends AbstractRecordReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader2.class);
-
-  private OutputMutator mutator;
-  private VectorContainerWriter writer;
-  private Path hadoopPath;
-  private FileSystem fileSystem;
-  private InputStream stream;
-  private JsonReaderWithState jsonReader;
-  private int recordCount;
-  private FragmentContext fragmentContext;
-  private OperatorContext operatorContext;
-  private List<SchemaPath> columns;
-  private boolean enableAllTextMode;
-
-  public JSONRecordReader2(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem,
-                          List<SchemaPath> columns) throws OutOfMemoryException {
-    this.hadoopPath = new Path(inputPath);
-    this.fileSystem = fileSystem;
-    this.fragmentContext = fragmentContext;
-    this.columns = columns;
-    enableAllTextMode = fragmentContext.getDrillbitContext().getOptionManager().getOption("store.json.all_text_mode").bool_val;
-  }
-
-  @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
-    try{
-      stream = fileSystem.open(hadoopPath);
-      JsonRecordSplitter splitter = new UTF8JsonRecordSplitter(stream);
-      this.writer = new VectorContainerWriter(output);
-      this.mutator = output;
-      jsonReader = new JsonReaderWithState(splitter, fragmentContext.getManagedBuffer(), columns, enableAllTextMode);
-    }catch(Exception e){
-      handleAndRaise("Failure reading JSON file.", e);
-    }
-  }
-
-  protected void handleAndRaise(String msg, Exception e) {
-    StringBuilder sb = new StringBuilder();
-    sb.append(msg).append(" - Parser was at record: ").append(recordCount+1);
-    if (e instanceof JsonParseException) {
-      JsonParseException ex = JsonParseException.class.cast(e);
-      sb.append(" column: ").append(ex.getLocation().getColumnNr());
-    }
-    throw new DrillRuntimeException(sb.toString(), e);
-  }
-
-
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  public void setOperatorContext(OperatorContext operatorContext) {
-    this.operatorContext = operatorContext;
-  }
-
-  @Override
-  public int next() {
-    writer.allocate();
-    writer.reset();
-
-    recordCount = 0;
-
-    try{
-      outside: while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION){
-        writer.setPosition(recordCount);
-
-        switch(jsonReader.write(writer)){
-        case WRITE_SUCCEED:
-          recordCount++;
-          break;
-
-        case NO_MORE:
-          break outside;
-
-        case WRITE_FAILED:
-          break outside;
-        };
-      }
-      for (SchemaPath sp :jsonReader.getNullColumns() ) {
-        PathSegment root = sp.getRootSegment();
-        BaseWriter.MapWriter fieldWriter = writer.rootAsMap();
-        while (root.getChild() != null && ! root.getChild().isArray()) {
-          fieldWriter = fieldWriter.map(root.getNameSegment().getPath());
-          root = root.getChild();
-        }
-        fieldWriter.integer(root.getNameSegment().getPath());
-      }
-
-      writer.setValueCount(recordCount);
-      return recordCount;
-
-    } catch (JsonParseException e) {
-      handleAndRaise("Error parsing JSON.", e);
-    } catch (IOException e) {
-      handleAndRaise("Error reading JSON.", e);
-    }
-    // this is never reached
-    return 0;
-  }
-
-  @Override
-  public void cleanup() {
-    try {
-      stream.close();
-    } catch (IOException e) {
-      logger.warn("Failure while closing stream.", e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/RewindableUtf8Reader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/RewindableUtf8Reader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/RewindableUtf8Reader.java
new file mode 100644
index 0000000..b9075de
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/RewindableUtf8Reader.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.fs.Seekable;
+
+import com.fasterxml.jackson.core.JsonStreamContextExposer;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.JsonParser.Feature;
+import com.fasterxml.jackson.core.io.IOContext;
+import com.fasterxml.jackson.core.json.JsonReadContext;
+import com.fasterxml.jackson.core.json.JsonReadContextExposer;
+import com.fasterxml.jackson.core.json.UTF8StreamJsonParser;
+import com.fasterxml.jackson.core.sym.BytesToNameCanonicalizer;
+
+/**
+ * An extended version of Jaskon's UTF8StreamJsonParser that supports rewind the stream to the previous record.
+ */
+public class RewindableUtf8Reader<T extends InputStream & Seekable> extends UTF8StreamJsonParser {
+
+  private T in;
+
+  /**
+   * Index of character after last available one in the buffer.
+   */
+  private long markFilePos;
+  private int markInputPtr;
+  private int markInputEnd;
+  private long markInputProcessed;
+  private int markInputRow;
+  private int markInputRowStart;
+  private long markInputTotal;
+  private int markTokenInputRow;
+  private int markTokenInputCol;
+  private JsonToken markToken;
+  private JsonToken markLastToken;
+  private JsonReadContext markContext;
+  private JsonReadContext rootContext;
+
+  private int type;
+  private int lineNr;
+  private int colNr;
+
+  private boolean closed = false;
+
+  public RewindableUtf8Reader(IOContext ctxt, int features, BytesToNameCanonicalizer sym, byte[] inputBuffer) {
+    super(ctxt, features, null, null, sym, inputBuffer, 0, 0, true);
+    this.rootContext = this._parsingContext;
+  }
+
+  public void mark() throws IOException{
+    this.markFilePos = this.in.getPos();
+    this.markInputPtr = this._inputPtr;
+    this.markInputEnd = this._inputEnd;
+    this.markInputProcessed = this._currInputProcessed;
+    this.markInputRow = this._currInputRow;
+    this.markInputRowStart = this._currInputRowStart;
+    this.markInputTotal = this._tokenInputTotal;
+    this.markTokenInputCol = this._tokenInputCol;
+    this.markTokenInputRow = this._tokenInputRow;
+    this.markToken = this._currToken;
+    this.markLastToken = this._lastClearedToken;
+    this.markContext = this._parsingContext;
+    this.type = JsonStreamContextExposer.getType(markContext);
+    this.lineNr = JsonReadContextExposer.getLineNmbr(markContext);
+    this.colNr = JsonReadContextExposer.getColNmbr(markContext);
+  }
+
+  public void resetToMark() throws IOException{
+    if(markFilePos != in.getPos()){
+      in.seek(markFilePos - _inputBuffer.length);
+      in.read(_inputBuffer, 0, _inputBuffer.length);
+    }
+    this._inputPtr = this.markInputPtr;
+    this._inputEnd = this.markInputEnd;
+    this._currInputProcessed = this.markInputProcessed;
+    this._currInputRow = this.markInputRow;
+    this._currInputRowStart = this.markInputRowStart;
+    this._tokenInputTotal = this.markInputTotal;
+    this._tokenInputCol = this.markTokenInputCol;
+    this._tokenInputRow = this.markTokenInputRow;
+    this._currToken = this.markToken;
+    this._lastClearedToken = this.markLastToken;
+    this._parsingContext = this.markContext;
+    JsonReadContextExposer.reset(markContext, type, lineNr, colNr);
+
+  }
+
+  @Override
+  protected void _closeInput() throws IOException {
+    super._closeInput();
+
+      if (_inputStream != null) {
+          if (_ioContext.isResourceManaged() || isEnabled(Feature.AUTO_CLOSE_SOURCE)) {
+              _inputStream.close();
+          }
+          _inputStream = null;
+      }
+      this.closed = true;
+
+  }
+
+  public void setInputStream(T in) throws IOException{
+    if(this.in != null){
+      in.close();
+    }
+
+    this._inputStream = in;
+    this.in = in;
+    this._parsingContext = rootContext;
+    this._inputPtr = 0;
+    this._inputEnd = 0;
+    this._currInputProcessed = 0;
+    this._currInputRow = 0;
+    this._currInputRowStart = 0;
+    this._tokenInputTotal = 0;
+    this._tokenInputCol = 0;
+    this._tokenInputRow = 0;
+    this._currToken = null;
+    this._lastClearedToken = null;
+    this.closed = false;
+  }
+
+  public boolean hasDataAvailable() throws IOException{
+    return !closed;
+  }
+
+  @Override
+  public String toString() {
+    return "RewindableUtf8Reader [markFilePos=" + markFilePos + ", markInputPtr=" + markInputPtr + ", markInputEnd="
+        + markInputEnd + ", markInputProcessed=" + markInputProcessed + ", markInputRow=" + markInputRow
+        + ", markInputRowStart=" + markInputRowStart + ", markInputTotal=" + markInputTotal + ", markTokenInputRow="
+        + markTokenInputRow + ", markTokenInputCol=" + markTokenInputCol + ", markToken=" + markToken
+        + ", markLastToken=" + markLastToken + ", markContext=" + markContext + ", rootContext=" + rootContext
+        + ", type=" + type + ", lineNr=" + lineNr + ", colNr=" + colNr + "]";
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/DrillBufInputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/DrillBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/DrillBufInputStream.java
new file mode 100644
index 0000000..1061a5c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/DrillBufInputStream.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.fn;
+
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.DrillBuf;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Seekable;
+
+/**
+ * An InputStream that wraps a DrillBuf and implements the seekable interface.
+ */
+public class DrillBufInputStream extends ByteBufInputStream implements Seekable {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillBufInputStream.class);
+
+  private final DrillBuf buffer;
+
+  private DrillBufInputStream(DrillBuf buffer, int len) {
+    super(buffer, len);
+    this.buffer = buffer;
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    buffer.readerIndex((int) pos);
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return buffer.readerIndex();
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return false;
+  }
+
+  public static DrillBufInputStream getStream(int start, int end, DrillBuf buffer){
+    DrillBuf buf = buffer.slice(start, end - start);
+    return new DrillBufInputStream(buf, end - start);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/FieldSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/FieldSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/FieldSelection.java
new file mode 100644
index 0000000..60c1dee
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/FieldSelection.java
@@ -0,0 +1,154 @@
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.vector.complex.fn;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+
+import com.google.common.collect.Maps;
+
+
+/**
+ * This class manages the projection pushdown for a complex path.
+ */
+class FieldSelection {
+
+  public static final FieldSelection INVALID_NODE = new FieldSelection(null, ValidityMode.NEVER_VALID);
+  public static final FieldSelection ALL_VALID = new FieldSelection(null, ValidityMode.ALWAYS_VALID);
+
+  private enum ValidityMode {CHECK_CHILDREN, NEVER_VALID, ALWAYS_VALID}
+
+  private final Map<String, FieldSelection> children;
+  private final Map<String, FieldSelection> childrenInsensitive;
+  private final ValidityMode mode;
+
+  private FieldSelection(){
+    this(new HashMap<String, FieldSelection>(), ValidityMode.CHECK_CHILDREN);
+  }
+
+  private FieldSelection(Map<String, FieldSelection> children, ValidityMode mode){
+    this.children = children;
+    if(children != null){
+      childrenInsensitive = new TreeMap<String, FieldSelection>(String.CASE_INSENSITIVE_ORDER);
+      childrenInsensitive.putAll(children);
+    }else{
+      childrenInsensitive = null;
+    }
+    this.mode = mode;
+  }
+
+  /**
+   * Create a new tree that has all leaves fixed to support full depth validity.
+   */
+  private FieldSelection fixNodes(){
+    if(children.isEmpty()){
+      return ALL_VALID;
+    }else{
+      Map<String, FieldSelection> newMap = Maps.newHashMap();
+      for(Entry<String, FieldSelection> e : children.entrySet()){
+        newMap.put(e.getKey(), e.getValue().fixNodes());
+      }
+      return new FieldSelection(newMap, ValidityMode.CHECK_CHILDREN);
+    }
+  }
+
+  private FieldSelection addChild(String name){
+    name = name.toLowerCase();
+    if(children.containsKey(name)){
+      return children.get(name);
+    }
+
+    FieldSelection n = new FieldSelection();
+    children.put(name, n);
+    return n;
+  }
+
+  private void add(PathSegment segment){
+    if(segment.isNamed()){
+      FieldSelection child = addChild(segment.getNameSegment().getPath());
+      if(!segment.isLastPath()){
+        child.add(segment.getChild());
+      }
+    }
+  }
+
+  public boolean isNeverValid(){
+    return mode == ValidityMode.NEVER_VALID;
+  }
+
+  public FieldSelection getChild(String name){
+    switch(mode){
+    case ALWAYS_VALID:
+      return ALL_VALID;
+    case CHECK_CHILDREN:
+      FieldSelection n = children.get(name);
+
+      // if we don't find, check to see if the lower case version of this path is available, if so, we'll add it with the new case to the original map.
+      if(n == null){
+        n = childrenInsensitive.get(name);
+        if(n != null){
+          children.put(name, n);
+        }
+      }
+      if(n == null){
+        return INVALID_NODE;
+      }else{
+        return n;
+      }
+    case NEVER_VALID:
+      return INVALID_NODE;
+    default:
+      throw new IllegalStateException();
+
+    }
+  }
+
+  private static boolean containsStar(List<SchemaPath> columns) {
+    for (SchemaPath expr : columns) {
+      if (expr.getRootSegment().getPath().equals("*")) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Generates a field selection based on a list of fields.  Assumes that a partial path a.b is equivalent to a.b.*
+   * @param fields
+   * @return
+   */
+  public static FieldSelection getFieldSelection(List<SchemaPath> fields){
+    if(containsStar(fields)){
+      return ALL_VALID;
+    }else{
+      FieldSelection root = new FieldSelection();
+      for(SchemaPath p : fields){
+        root.add(p.getRootSegment());
+      }
+      return root.fixNodes();
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 0ca24e7..cc06b05 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.vector.complex.fn;
 import io.netty.buffer.DrillBuf;
 
 import java.io.IOException;
-import java.io.Reader;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -33,15 +33,22 @@ import org.apache.drill.exec.expr.holders.Float8Holder;
 import org.apache.drill.exec.expr.holders.VarCharHolder;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.easy.json.RewindableUtf8Reader;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Seekable;
 
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonParser.Feature;
 import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.io.IOContext;
+import com.fasterxml.jackson.core.sym.BytesToNameCanonicalizer;
+import com.fasterxml.jackson.core.util.BufferRecycler;
 import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 
@@ -49,155 +56,183 @@ public class JsonReader {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonReader.class);
   public final static int MAX_RECORD_SIZE = 128*1024;
 
+  private final RewindableUtf8Reader parser;
+  private DrillBuf workBuf;
+  private final List<SchemaPath> columns;
+  private final boolean allTextMode;
+  private boolean atLeastOneWrite = false;
 
+  private FieldSelection selection;
 
-  private final JsonFactory factory = new JsonFactory();
-  private JsonParser parser;
-  private DrillBuf workBuf;
-  private List<SchemaPath> columns;
-  // This is a parallel array for the field above to indicate if we have found any values in a
-  // given selected column. This allows for columns that are requested to receive a vector full of
-  // null values if no values were found in an entire read. The reason this needs to happen after
-  // all of the records have been read in a batch is to prevent a schema change when we actually find
-  // data in that column.
-  private boolean[] columnsFound;
-  // A flag set at setup time if the start column is in the requested column list, prevents
-  // doing a more computational intensive check if we are supposed to be reading a column
-  private boolean starRequested;
-  private boolean allTextMode;
+  /**
+   * Whether we are in a reset state. In a reset state, we don't have to advance to the next token on write because
+   * we're already at the start of the next object
+   */
+  private boolean onReset = false;
+
+  public static enum ReadState {WRITE_FAILURE, END_OF_STREAM, WRITE_SUCCEED}
 
   public JsonReader() throws IOException {
     this(null, false);
   }
 
-  public JsonReader(DrillBuf managedBuf, boolean allTextMode) throws IOException {
+  public JsonReader(DrillBuf managedBuf, boolean allTextMode) {
     this(managedBuf, GroupScan.ALL_COLUMNS, allTextMode);
   }
 
-  public JsonReader(DrillBuf managedBuf, List<SchemaPath> columns, boolean allTextMode) throws JsonParseException, IOException {
-    factory.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
-    factory.configure(Feature.ALLOW_COMMENTS, true);
+  public JsonReader(DrillBuf managedBuf, List<SchemaPath> columns, boolean allTextMode) {
+    BufferRecycler recycler = new BufferRecycler();
+    IOContext context = new IOContext(recycler, this, false);
+    final int features = JsonParser.Feature.collectDefaults() //
+        | Feature.ALLOW_COMMENTS.getMask() //
+        | Feature.ALLOW_UNQUOTED_FIELD_NAMES.getMask();
+
+    BytesToNameCanonicalizer can = BytesToNameCanonicalizer.createRoot();
+    parser = new RewindableUtf8Reader<>(context, features, can.makeChild(JsonFactory.Feature.collectDefaults()), context.allocReadIOBuffer());
+
     assert Preconditions.checkNotNull(columns).size() > 0 : "json record reader requires at least a column";
-    this.columns = columns;
-    this.starRequested = containsStar();
+
+    this.selection = FieldSelection.getFieldSelection(columns);
     this.workBuf = managedBuf;
     this.allTextMode = allTextMode;
-    this.columnsFound = new boolean[this.columns.size()];
+    this.columns = columns;
   }
 
-  private boolean containsStar() {
-    for (SchemaPath expr : this.columns) {
-      if (expr.getRootSegment().getPath().equals("*")) {
-        return true;
+  public void ensureAtLeastOneField(ComplexWriter writer){
+    if(!atLeastOneWrite){
+      // if we had no columns, create one empty one so we can return some data for count purposes.
+      SchemaPath sp = columns.get(0);
+      PathSegment root = sp.getRootSegment();
+      BaseWriter.MapWriter fieldWriter = writer.rootAsMap();
+      while (root.getChild() != null && ! root.getChild().isArray()) {
+        fieldWriter = fieldWriter.map(root.getNameSegment().getPath());
+        root = root.getChild();
       }
+      fieldWriter.integer(root.getNameSegment().getPath());
     }
-    return false;
   }
 
-  private boolean fieldSelected(SchemaPath field) {
-    if (starRequested) {
-      return true;
-    }
-    int i = 0;
-    for (SchemaPath expr : this.columns) {
-      if ( expr.contains(field)) {
-        columnsFound[i] = true;
-        return true;
-      }
-      i++;
-    }
-    return false;
+  public void setSource(FSDataInputStream is) throws IOException{
+    parser.setInputStream(is);
+    this.onReset = false;
   }
 
-  public List<SchemaPath> getNullColumns() {
-    ArrayList<SchemaPath> nullColumns = new ArrayList<SchemaPath>();
-    for (int i = 0; i < columnsFound.length; i++ ) {
-      if ( ! columnsFound[i] && !columns.get(i).equals(AbstractRecordReader.STAR_COLUMN)) {
-        nullColumns.add(columns.get(i));
-      }
-    }
-    return nullColumns;
+  public void setSource(int start, int end, DrillBuf buf) throws IOException{
+    parser.setInputStream(DrillBufInputStream.getStream(start, end, buf));
+  }
+
+  public void setSource(String data) throws IOException {
+    setSource(data.getBytes(Charsets.UTF_8));
+  }
+
+  public void setSource(byte[] bytes) throws IOException{
+    parser.setInputStream(new SeekableBAIS(bytes));
+    this.onReset = false;
   }
 
-  public boolean write(Reader reader, ComplexWriter writer) throws JsonParseException, IOException {
 
-    parser = factory.createJsonParser(reader);
-    reader.mark(MAX_RECORD_SIZE);
-    JsonToken t = parser.nextToken();
-    while (!parser.hasCurrentToken()) {
+  public boolean write(ComplexWriter writer) throws JsonParseException, IOException {
+
+    JsonToken t = onReset ? parser.getCurrentToken() : parser.nextToken();
+
+    while (!parser.hasCurrentToken() && parser.hasDataAvailable()) {
       t = parser.nextToken();
     }
-    return writeToVector(writer, t);
+
+    if(!parser.hasCurrentToken()){
+      return false;
+    }
+
+    if(onReset){
+      onReset = false;
+    }else{
+      parser.mark();
+    }
+
+    ReadState readState = writeToVector(writer, t);
+
+    switch(readState){
+    case END_OF_STREAM:
+      return false;
+    case WRITE_FAILURE:
+      logger.debug("Ran out of space while writing object, rewinding to object start.");
+      parser.resetToMark();
+      onReset = true;
+      return false;
+
+    case WRITE_SUCCEED:
+      return true;
+
+    }
+    throw new IllegalStateException();
+
   }
 
- public boolean write(byte[] jsonString, ComplexWriter writer) throws JsonParseException, IOException {
-    parser = factory.createJsonParser(jsonString);
-    JsonToken t = parser.nextToken();
-    while(!parser.hasCurrentToken()) {
-      t = parser.nextToken();
+  private ReadState writeToVector(ComplexWriter writer, JsonToken t) throws JsonParseException, IOException {
+    if (!writer.ok()) {
+      return ReadState.WRITE_FAILURE;
     }
-    return writeToVector(writer, t);
- }
 
-private boolean writeToVector(ComplexWriter writer, JsonToken t) throws JsonParseException, IOException {
-  if (!writer.ok()) {
-    return false;
+
+    switch (t) {
+      case START_OBJECT:
+        writeDataSwitch(writer.rootAsMap());
+        break;
+      case START_ARRAY:
+        writeDataSwitch(writer.rootAsList());
+        break;
+      case NOT_AVAILABLE:
+        return ReadState.END_OF_STREAM;
+      default:
+        throw new JsonParseException(
+            String.format("Failure while parsing JSON.  Found token of [%s]  Drill currently only supports parsing "
+                + "json strings that contain either lists or maps.  The root object cannot be a scalar.",
+                t),
+            parser.getCurrentLocation());
+      }
+
+      if(writer.ok()){
+        return ReadState.WRITE_SUCCEED;
+      }else{
+        return ReadState.WRITE_FAILURE;
+      }
   }
 
-  switch (t) {
-    case START_OBJECT:
-      writeData(writer.rootAsMap());
-      break;
-    case START_ARRAY:
-      writeData(writer.rootAsList());
-      break;
-    case NOT_AVAILABLE:
-      return false;
-    default:
-      throw new JsonParseException(
-          String.format("Failure while parsing JSON.  Found token of [%s]  Drill currently only supports parsing "
-              + "json strings that contain either lists or maps.  The root object cannot be a scalar.",
-              t),
-          parser.getCurrentLocation());
+  private void writeDataSwitch(MapWriter w) throws JsonParseException, IOException{
+    if(this.allTextMode){
+      writeDataAllText(w, this.selection);
+    }else{
+      writeData(w, this.selection);
     }
+  }
 
-    return true;
-}
+  private void writeDataSwitch(ListWriter w) throws JsonParseException, IOException{
+    if(this.allTextMode){
+      writeDataAllText(w);
+    }else{
+      writeData(w);
+    }
+  }
 
-  private void consumeEntireNextValue(JsonParser parser) throws IOException {
+  private void consumeEntireNextValue() throws IOException {
     switch (parser.nextToken()) {
       case START_ARRAY:
       case START_OBJECT:
-        int arrayAndObjectCounter = 1;
-        skipArrayLoop: while (true) {
-          switch(parser.nextToken()) {
-            case START_ARRAY:
-            case START_OBJECT:
-              arrayAndObjectCounter++;
-              break;
-            case END_ARRAY:
-            case END_OBJECT:
-              arrayAndObjectCounter--;
-              if (arrayAndObjectCounter == 0) {
-                break skipArrayLoop;
-              }
-              break;
-          }
-        }
-        break;
+        parser.skipChildren();
+        return;
       default:
         // hit a single value, do nothing as the token was already read
         // in the switch statement
-        break;
+        return;
     }
   }
 
-  private void writeData(MapWriter map) throws JsonParseException, IOException {
+  private void writeData(MapWriter map, FieldSelection selection) throws JsonParseException, IOException {
     //
     map.start();
     outside: while(true) {
       if (!map.ok()) {
-        break;
+        return;
       }
       JsonToken t = parser.nextToken();
       if (t == JsonToken.NOT_AVAILABLE || t == JsonToken.END_OBJECT) {
@@ -205,15 +240,11 @@ private boolean writeToVector(ComplexWriter writer, JsonToken t) throws JsonPars
       }
 
       assert t == JsonToken.FIELD_NAME : String.format("Expected FIELD_NAME but got %s.", t.name());
+
       final String fieldName = parser.getText();
-      SchemaPath path;
-      if (map.getField().getPath().getRootSegment().getPath().equals("")) {
-        path = new SchemaPath(new PathSegment.NameSegment(fieldName));
-      } else {
-        path = map.getField().getPath().getChild(fieldName);
-      }
-      if ( ! fieldSelected(path) ) {
-        consumeEntireNextValue(parser);
+      FieldSelection childSelection = selection.getChild(fieldName);
+      if(childSelection.isNeverValid()){
+        consumeEntireNextValue();
         continue outside;
       }
 
@@ -222,30 +253,20 @@ private boolean writeToVector(ComplexWriter writer, JsonToken t) throws JsonPars
         writeData(map.list(fieldName));
         break;
       case START_OBJECT:
-        writeData(map.map(fieldName));
+        writeData(map.map(fieldName), childSelection);
         break;
       case END_OBJECT:
         break outside;
 
       case VALUE_EMBEDDED_OBJECT:
       case VALUE_FALSE: {
-        if (allTextMode) {
-          handleString(parser, map, fieldName);
-          break;
-        }
-        BitHolder h = new BitHolder();
-        h.value = 0;
-        map.bit(fieldName).write(h);
+        map.bit(fieldName).writeBit(0);
+        atLeastOneWrite = true;
         break;
       }
       case VALUE_TRUE: {
-        if (allTextMode) {
-          handleString(parser, map, fieldName);
-          break;
-        }
-        BitHolder h = new BitHolder();
-        h.value = 1;
-        map.bit(fieldName).write(h);
+        map.bit(fieldName).writeBit(1);
+        atLeastOneWrite = true;
         break;
       }
       case VALUE_NULL:
@@ -256,25 +277,16 @@ private boolean writeToVector(ComplexWriter writer, JsonToken t) throws JsonPars
         // do nothing as we don't have a type.
         break;
       case VALUE_NUMBER_FLOAT:
-        if (allTextMode) {
-          handleString(parser, map, fieldName);
-          break;
-        }
-        Float8Holder fh = new Float8Holder();
-        fh.value = parser.getDoubleValue();
-        map.float8(fieldName).write(fh);
+        map.float8(fieldName).writeFloat8(parser.getDoubleValue());
+        atLeastOneWrite = true;
         break;
       case VALUE_NUMBER_INT:
-        if (allTextMode) {
-          handleString(parser, map, fieldName);
-          break;
-        }
-        BigIntHolder bh = new BigIntHolder();
-        bh.value = parser.getLongValue();
-        map.bigInt(fieldName).write(bh);
+        map.bigInt(fieldName).writeBigInt(parser.getLongValue());
+        atLeastOneWrite = true;
         break;
       case VALUE_STRING:
         handleString(parser, map, fieldName);
+        atLeastOneWrite = true;
         break;
 
       default:
@@ -286,41 +298,91 @@ private boolean writeToVector(ComplexWriter writer, JsonToken t) throws JsonPars
 
   }
 
+
+  private void writeDataAllText(MapWriter map, FieldSelection selection) throws JsonParseException, IOException {
+    //
+    map.start();
+    outside: while(true) {
+      if (!map.ok()) {
+        return;
+      }
+      JsonToken t = parser.nextToken();
+      if (t == JsonToken.NOT_AVAILABLE || t == JsonToken.END_OBJECT) {
+        return;
+      }
+
+      assert t == JsonToken.FIELD_NAME : String.format("Expected FIELD_NAME but got %s.", t.name());
+
+      final String fieldName = parser.getText();
+      FieldSelection childSelection = selection.getChild(fieldName);
+      if(childSelection.isNeverValid()){
+        consumeEntireNextValue();
+        continue outside;
+      }
+
+
+      switch(parser.nextToken()) {
+      case START_ARRAY:
+        writeDataAllText(map.list(fieldName));
+        break;
+      case START_OBJECT:
+        writeDataAllText(map.map(fieldName), childSelection);
+        break;
+      case END_OBJECT:
+        break outside;
+
+      case VALUE_EMBEDDED_OBJECT:
+      case VALUE_FALSE:
+      case VALUE_TRUE:
+      case VALUE_NUMBER_FLOAT:
+      case VALUE_NUMBER_INT:
+      case VALUE_STRING:
+        handleString(parser, map, fieldName);
+        atLeastOneWrite = true;
+        break;
+      case VALUE_NULL:
+        // do check value capacity only if vector is allocated.
+        if (map.getValueCapacity() > 0) {
+          map.checkValueCapacity();
+        }
+        // do nothing as we don't have a type.
+        break;
+
+
+      default:
+        throw new IllegalStateException("Unexpected token " + parser.getCurrentToken());
+
+      }
+    }
+    map.end();
+
+  }
+
+
   private void ensure(int length) {
     workBuf = workBuf.reallocIfNeeded(length);
   }
 
-  private VarCharHolder prepareVarCharHolder(VarCharHolder vh, String value) throws IOException {
+  private int prepareVarCharHolder(String value) throws IOException {
     byte[] b = value.getBytes(Charsets.UTF_8);
     ensure(b.length);
     workBuf.setBytes(0, b);
-    vh.buffer = workBuf;
-    vh.start = 0;
-    vh.end = b.length;
-    return vh;
+    return b.length;
   }
 
   private void handleString(JsonParser parser, MapWriter writer, String fieldName) throws IOException {
-    VarCharHolder vh = new VarCharHolder();
-    writer.varChar(fieldName).write(prepareVarCharHolder(vh, parser.getText()));
+    writer.varChar(fieldName).writeVarChar(0, prepareVarCharHolder(parser.getText()), workBuf);
   }
 
   private void handleString(JsonParser parser, ListWriter writer) throws IOException {
-    VarCharHolder vh = new VarCharHolder();
-    writer.varChar().write(prepareVarCharHolder(vh, parser.getText()));
-  }
-
-  private void handleString(String value, ListWriter writer) throws IOException {
-    VarCharHolder vh = new VarCharHolder();
-    writer.varChar().write(prepareVarCharHolder(vh, parser.getText()));
+    writer.varChar().writeVarChar(0, prepareVarCharHolder(parser.getText()), workBuf);
   }
 
   private void writeData(ListWriter list) throws JsonParseException, IOException {
     list.start();
     outside: while (true) {
       if (!list.ok()) {
-//        logger.warn("Error reported. Quit writing");
-        break;
+        return;
       }
 
       switch (parser.nextToken()) {
@@ -328,7 +390,7 @@ private boolean writeToVector(ComplexWriter writer, JsonToken t) throws JsonPars
         writeData(list.list());
         break;
       case START_OBJECT:
-        writeData(list.map());
+        writeData(list.map(), FieldSelection.ALL_VALID);
         break;
       case END_ARRAY:
       case END_OBJECT:
@@ -336,53 +398,66 @@ private boolean writeToVector(ComplexWriter writer, JsonToken t) throws JsonPars
 
       case VALUE_EMBEDDED_OBJECT:
       case VALUE_FALSE:{
-        if (allTextMode) {
-          handleString(parser, list);
-          break;
-        }
-        BitHolder h = new BitHolder();
-        h.value = 0;
-        list.bit().write(h);
+        list.bit().writeBit(0);
+        atLeastOneWrite = true;
         break;
       }
       case VALUE_TRUE: {
-        if (allTextMode) {
-          handleString(parser, list);
-          break;
-        }
-        BitHolder h = new BitHolder();
-        h.value = 1;
-        list.bit().write(h);
+        list.bit().writeBit(1);
+        atLeastOneWrite = true;
         break;
       }
       case VALUE_NULL:
-        if (allTextMode) {
-          handleString("null", list);
-          break;
-        }
         throw new DrillRuntimeException("Null values are not supported in lists be default. " +
-            "Please set jason_all_text_mode to true to read lists containing nulls. " +
+            "Please set `store.json.all_text_mode` to true to read lists containing nulls. " +
             "Be advised that this will treat JSON null values as string containing the word 'null'.");
       case VALUE_NUMBER_FLOAT:
-        if (allTextMode) {
-          handleString(parser, list);
-          break;
-        }
-        Float8Holder fh = new Float8Holder();
-        fh.value = parser.getDoubleValue();
-        list.float8().write(fh);
+        list.float8().writeFloat8(parser.getDoubleValue());
+        atLeastOneWrite = true;
         break;
       case VALUE_NUMBER_INT:
-        if (allTextMode) {
-          handleString(parser, list);
-          break;
-        }
-        BigIntHolder bh = new BigIntHolder();
-        bh.value = parser.getLongValue();
-        list.bigInt().write(bh);
+        list.bigInt().writeBigInt(parser.getLongValue());
+        atLeastOneWrite = true;
+        break;
+      case VALUE_STRING:
+        handleString(parser, list);
+        atLeastOneWrite = true;
+        break;
+      default:
+        throw new IllegalStateException("Unexpected token " + parser.getCurrentToken());
+      }
+    }
+    list.end();
+
+  }
+
+  private void writeDataAllText(ListWriter list) throws JsonParseException, IOException {
+    list.start();
+    outside: while (true) {
+      if (!list.ok()) {
+        return;
+      }
+
+      switch (parser.nextToken()) {
+      case START_ARRAY:
+        writeDataAllText(list.list());
         break;
+      case START_OBJECT:
+        writeDataAllText(list.map(), FieldSelection.ALL_VALID);
+        break;
+      case END_ARRAY:
+      case END_OBJECT:
+        break outside;
+
+      case VALUE_EMBEDDED_OBJECT:
+      case VALUE_FALSE:
+      case VALUE_TRUE:
+      case VALUE_NULL:
+      case VALUE_NUMBER_FLOAT:
+      case VALUE_NUMBER_INT:
       case VALUE_STRING:
         handleString(parser, list);
+        atLeastOneWrite = true;
         break;
       default:
         throw new IllegalStateException("Unexpected token " + parser.getCurrentToken());

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java
deleted file mode 100644
index 63632a4..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.vector.complex.fn;
-
-import io.netty.buffer.DrillBuf;
-
-import java.io.IOException;
-import java.io.Reader;
-import java.util.List;
-
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
-
-import com.fasterxml.jackson.core.JsonParseException;
-
-public class JsonReaderWithState {
-
-  public static enum WriteState {
-    WRITE_SUCCEED, WRITE_FAILED, NO_MORE
-  }
-
-  private Reader reader;
-  private JsonRecordSplitter splitter;
-  private JsonReader jsonReader;
-
-  public JsonReaderWithState(JsonRecordSplitter splitter, DrillBuf workspace, List<SchemaPath> columns, boolean allTextMode) throws IOException{
-    this.splitter = splitter;
-    reader = splitter.getNextReader();
-    jsonReader = new JsonReader(workspace, columns, allTextMode);
-  }
-
-  public JsonReaderWithState(DrillBuf workspace, boolean allTextMode) throws IOException {
-    jsonReader = new JsonReader(workspace, GroupScan.ALL_COLUMNS, allTextMode);
-  }
-
-  public JsonReaderWithState(DrillBuf workspace, List<SchemaPath> columns, boolean allTextMode) throws IOException {
-    jsonReader = new JsonReader(workspace, columns, allTextMode);
-  }
-
-  public JsonReaderWithState(JsonRecordSplitter splitter) throws IOException{
-    this(splitter, null, GroupScan.ALL_COLUMNS, false);
-  }
-
-  public List<SchemaPath> getNullColumns() {
-    return jsonReader.getNullColumns();
-  }
-
-  public WriteState write(ComplexWriter writer) throws JsonParseException, IOException {
-    if (reader == null) {
-      reader = splitter.getNextReader();
-      if (reader == null) {
-        return WriteState.NO_MORE;
-      }
-    }
-
-    jsonReader.write(reader, writer);
-
-    if (!writer.ok()) {
-      reader.reset();
-      return WriteState.WRITE_FAILED;
-    } else {
-      reader = null;
-      return WriteState.WRITE_SUCCEED;
-    }
-  }
-
-  public WriteState write(byte[] bytes, ComplexWriter writer)
-      throws JsonParseException, IOException {
-    if (bytes.length == 0) {
-      return WriteState.NO_MORE;
-    }
-
-    jsonReader.write(bytes, writer);
-
-    if (!writer.ok()) {
-      return WriteState.WRITE_FAILED;
-    } else {
-      return WriteState.WRITE_SUCCEED;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitter.java
deleted file mode 100644
index 6f6e7af..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitter.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.vector.complex.fn;
-
-import java.io.IOException;
-import java.io.Reader;
-
-public interface JsonRecordSplitter {
-
-  public abstract Reader getNextReader() throws IOException;
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitterBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitterBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitterBase.java
deleted file mode 100644
index 6d81d60..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitterBase.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.vector.complex.fn;
-
-import java.io.IOException;
-import java.io.Reader;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-
-public abstract class JsonRecordSplitterBase implements JsonRecordSplitter {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReaderJSONRecordSplitter.class);
-  public final static int MAX_RECORD_SIZE = JsonReader.MAX_RECORD_SIZE;
-
-  private static final int OPEN_CBRACKET = '{';
-  private static final int OPEN_BRACKET = '[';
-  private static final int CLOSE_CBRACKET = '}';
-  private static final int CLOSE_BRACKET = ']';
-  private static final int ESCAPE = '\\';
-  private static final int LITERAL = '"';
-
-  private static final int SPACE = ' ';
-  private static final int TAB = '\t';
-  private static final int NEW_LINE = '\n';
-  private static final int FORM_FEED = '\f';
-  private static final int CR = '\r';
-
-  private long start = 0;
-
-
-  protected void preScan() throws IOException { }
-
-  protected void postScan() throws IOException { }
-
-  protected abstract int readNext() throws IOException;
-
-  protected abstract Reader createReader(long maxBytes);
-
-  @Override
-  public Reader getNextReader() throws IOException {
-    preScan();
-
-    boolean isEscaped = false;
-    boolean inLiteral = false;
-    boolean inCandidate = false;
-    boolean found = false;
-
-    long endOffset = start;
-    long curBytes = 0;
-    int cur;
-    outside: while(true) {
-      cur = readNext();
-      endOffset++;
-      curBytes = endOffset - 1 - start;
-      if (curBytes > MAX_RECORD_SIZE) {
-        throw new DrillRuntimeException(String.format("Record is too long. Max allowed record size is %s bytes.", MAX_RECORD_SIZE));
-      }
-
-      if(cur == -1) {
-        if(inCandidate){
-          found = true;
-        }
-        break;
-      }
-
-      switch(cur) {
-        case ESCAPE:
-          isEscaped = !isEscaped;
-          break;
-        case LITERAL:
-          if (!isEscaped) {
-            inLiteral = !inLiteral;
-          }
-          isEscaped = false;
-          break;
-        case CLOSE_BRACKET:
-        case CLOSE_CBRACKET:
-          inCandidate = !inLiteral;
-          break;
-        case OPEN_BRACKET:
-        case OPEN_CBRACKET:
-          if(inCandidate){
-            found = true;
-            break outside;
-          }
-          break;
-
-        case SPACE:
-        case TAB:
-        case NEW_LINE:
-        case CR:
-        case FORM_FEED:
-          break;
-
-        default:
-          inCandidate = false;
-          isEscaped = false;
-      }
-    }
-
-    if(!found) {
-      return null;
-    }
-
-    postScan();
-    start = endOffset;
-    return createReader(curBytes);
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ReaderJSONRecordSplitter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ReaderJSONRecordSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ReaderJSONRecordSplitter.java
deleted file mode 100644
index aef8b75..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ReaderJSONRecordSplitter.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.vector.complex.fn;
-
-import java.io.IOException;
-import java.io.Reader;
-import java.io.StringReader;
-
-public class ReaderJSONRecordSplitter extends JsonRecordSplitterBase {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReaderJSONRecordSplitter.class);
-
-  private Reader reader;
-
-  public ReaderJSONRecordSplitter(String str){
-    this(new StringReader(str));
-  }
-
-  public ReaderJSONRecordSplitter(Reader reader){
-    this.reader = reader;
-  }
-
-  @Override
-  protected void preScan() throws IOException {
-    reader.mark(MAX_RECORD_SIZE);
-  }
-
-  @Override
-  protected void postScan() throws IOException {
-    reader.reset();
-  }
-
-  @Override
-  protected int readNext() throws IOException {
-    return reader.read();
-  }
-
-  @Override
-  protected Reader createReader(long maxBytes) {
-    return new LimitedReader(reader, (int)maxBytes);
-  }
-
-  private class LimitedReader extends Reader {
-
-    private final Reader incoming;
-    private final int maxBytes;
-    private int markedBytes = 0;
-    private int bytes = 0;
-
-    public LimitedReader(Reader in, int maxBytes) {
-      this.maxBytes = maxBytes;
-      this.incoming = in;
-    }
-
-    @Override
-    public int read() throws IOException {
-      if (bytes >= maxBytes){
-        return -1;
-      }else{
-        bytes++;
-        return incoming.read();
-      }
-    }
-
-    @Override
-    public void mark(int readAheadLimit) throws IOException {
-      incoming.mark(readAheadLimit);
-      markedBytes = bytes;
-    }
-
-    @Override
-    public void reset() throws IOException {
-      incoming.reset();
-      bytes = markedBytes;
-    }
-
-    @Override
-    public int read(char[] cbuf, int off, int len) throws IOException {
-      int outputLength = Math.min(len, maxBytes - bytes);
-      if(outputLength > 0){
-        incoming.read(cbuf, off, outputLength);
-        bytes += outputLength;
-        return outputLength;
-      }else{
-        return -1;
-      }
-    }
-
-    @Override
-    public void close() throws IOException { }
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/b218ec03/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/SeekableBAIS.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/SeekableBAIS.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/SeekableBAIS.java
new file mode 100644
index 0000000..92eee98
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/SeekableBAIS.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.fn;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Seekable;
+
+/**
+ * A ByteArrayInputStream that supports the HDFS Seekable API.
+ */
+public class SeekableBAIS extends ByteArrayInputStream implements Seekable {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SeekableBAIS.class);
+
+
+  public SeekableBAIS(byte[] buf, int offset, int length) {
+    super(buf, offset, length);
+  }
+
+  public SeekableBAIS(byte[] buf) {
+    super(buf);
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    if(pos > buf.length){
+      throw new EOFException();
+    }
+    this.pos = (int) pos;
+    this.count = buf.length - (int) pos;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return pos;
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return false;
+  }
+
+
+
+}


Mime
View raw message