drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From par...@apache.org
Subject [3/3] drill git commit: DRILL-2193: implement fast count / skip-all semantics for JSON reader
Date Tue, 24 Mar 2015 02:30:44 GMT
DRILL-2193: implement fast count / skip-all semantics for JSON reader


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/64ad7a73
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/64ad7a73
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/64ad7a73

Branch: refs/heads/master
Commit: 64ad7a7314911a9b3ee6618c9c44b54473624109
Parents: f1b59ed
Author: Hanifi Gunes <hgunes@maprtech.com>
Authored: Wed Mar 18 16:18:37 2015 -0700
Committer: Parth Chandra <pchandra@maprtech.com>
Committed: Mon Mar 23 18:41:32 2015 -0700

----------------------------------------------------------------------
 .../exec/store/easy/json/JSONRecordReader.java  | 22 ++++----
 .../exec/store/easy/json/JsonProcessor.java     | 38 +++++++++++++
 .../easy/json/reader/BaseJsonProcessor.java     | 58 ++++++++++++++++++++
 .../easy/json/reader/CountingJsonReader.java    | 50 +++++++++++++++++
 .../exec/vector/complex/fn/JsonReader.java      | 25 +++------
 5 files changed, 163 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/64ad7a73/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index c343177..6fbdf4f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -20,11 +20,10 @@ package org.apache.drill.exec.store.easy.json;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.memory.OutOfMemoryException;
@@ -33,21 +32,17 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.easy.json.reader.CountingJsonReader;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.complex.fn.JsonReader;
-import org.apache.drill.exec.vector.complex.fn.JsonReader.ReadState;
+import org.apache.drill.exec.store.easy.json.JsonProcessor.ReadState;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-import org.apache.drill.exec.vector.complex.writer.BaseWriter;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.fasterxml.jackson.core.JsonParseException;
-import com.google.common.base.Stopwatch;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.CompressionInputStream;
 
 public class JSONRecordReader extends AbstractRecordReader {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
@@ -57,11 +52,10 @@ public class JSONRecordReader extends AbstractRecordReader {
   private Path hadoopPath;
   private InputStream stream;
   private DrillFileSystem fileSystem;
-  private JsonReader jsonReader;
+  private JsonProcessor jsonReader;
   private int recordCount;
   private FragmentContext fragmentContext;
   private OperatorContext operatorContext;
-  private List<SchemaPath> columns;
   private boolean enableAllTextMode;
 
   public JSONRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem
fileSystem,
@@ -69,8 +63,8 @@ public class JSONRecordReader extends AbstractRecordReader {
     this.hadoopPath = new Path(inputPath);
     this.fileSystem = fileSystem;
     this.fragmentContext = fragmentContext;
-    this.columns = columns;
     this.enableAllTextMode = fragmentContext.getOptions().getOption(ExecConstants.JSON_ALL_TEXT_MODE).bool_val;
+    setColumns(columns);
   }
 
   @Override
@@ -85,7 +79,11 @@ public class JSONRecordReader extends AbstractRecordReader {
       }
       this.writer = new VectorContainerWriter(output);
       this.mutator = output;
-      this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), columns, enableAllTextMode);
+      if (isSkipQuery()) {
+        this.jsonReader = new CountingJsonReader(fragmentContext.getManagedBuffer());
+      } else {
+        this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), ImmutableList.copyOf(getColumns()),
enableAllTextMode);
+      }
       this.jsonReader.setSource(stream);
     }catch(Exception e){
       handleAndRaise("Failure reading JSON file.", e);

http://git-wip-us.apache.org/repos/asf/drill/blob/64ad7a73/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
new file mode 100644
index 0000000..ce6017b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+
+public interface JsonProcessor {
+
+  public static enum ReadState {
+    WRITE_FAILURE,
+    END_OF_STREAM,
+    WRITE_SUCCEED
+  }
+
+  ReadState write(BaseWriter.ComplexWriter writer) throws IOException;
+
+  void setSource(InputStream is) throws IOException;
+
+  void ensureAtLeastOneField(BaseWriter.ComplexWriter writer);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/64ad7a73/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
new file mode 100644
index 0000000..509798a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.reader;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.io.IOContext;
+import com.fasterxml.jackson.core.sym.BytesToNameCanonicalizer;
+import com.fasterxml.jackson.core.util.BufferRecycler;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.store.easy.json.JsonProcessor;
+import org.apache.drill.exec.store.easy.json.RewindableUtf8Reader;
+
+public abstract class BaseJsonProcessor implements JsonProcessor {
+
+  protected final RewindableUtf8Reader parser;
+  protected DrillBuf workBuf;
+
+  public BaseJsonProcessor(DrillBuf workBuf) {
+    this.workBuf = Preconditions.checkNotNull(workBuf);
+    this.parser = Preconditions.checkNotNull(createParser());
+  }
+
+  protected RewindableUtf8Reader createParser() {
+    final BufferRecycler recycler = new BufferRecycler();
+    final IOContext context = new IOContext(recycler, this, false);
+    final int features = JsonParser.Feature.collectDefaults() //
+        | JsonParser.Feature.ALLOW_COMMENTS.getMask() //
+        | JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES.getMask();
+
+    final BytesToNameCanonicalizer can = BytesToNameCanonicalizer.createRoot();
+    return new RewindableUtf8Reader<>(context, features, can.makeChild(JsonFactory.Feature.collectDefaults()),
context.allocReadIOBuffer());
+  }
+
+  @Override
+  public void setSource(InputStream is) throws IOException {
+    parser.setInputStream(is);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/64ad7a73/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
new file mode 100644
index 0000000..1ef71e7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.reader;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.core.JsonToken;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.store.easy.json.JsonProcessor;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+
+public class CountingJsonReader extends BaseJsonProcessor implements JsonProcessor {
+
+  public CountingJsonReader(DrillBuf workBuf) {
+    super(workBuf);
+  }
+
+  @Override
+  public ReadState write(BaseWriter.ComplexWriter writer) throws IOException {
+    final JsonToken token = parser.nextToken();
+    if (!parser.hasCurrentToken()) {
+      return ReadState.END_OF_STREAM;
+    } else if (token != JsonToken.START_OBJECT) {
+      throw new IllegalStateException(String.format("Cannot read from the middle of a record.
Current token was %s", token));
+    }
+    writer.rootAsMap().bit("count").writeBit(1);
+    parser.skipChildren();
+    return writer.ok() ? ReadState.WRITE_SUCCEED : ReadState.WRITE_FAILURE;
+  }
+
+  @Override
+  public void ensureAtLeastOneField(BaseWriter.ComplexWriter writer) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/64ad7a73/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index cc5c8af..9738ff8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -33,7 +33,9 @@ import org.apache.drill.exec.expr.holders.Float8Holder;
 import org.apache.drill.exec.expr.holders.VarCharHolder;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.easy.json.JsonProcessor;
 import org.apache.drill.exec.store.easy.json.RewindableUtf8Reader;
+import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
@@ -53,12 +55,10 @@ import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.io.compress.CompressionInputStream;
 
-public class JsonReader {
+public class JsonReader extends BaseJsonProcessor implements JsonProcessor {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonReader.class);
   public final static int MAX_RECORD_SIZE = 128*1024;
 
-  private final RewindableUtf8Reader parser;
-  private DrillBuf workBuf;
   private final List<SchemaPath> columns;
   private final boolean allTextMode;
   private boolean atLeastOneWrite = false;
@@ -71,11 +71,6 @@ public class JsonReader {
    */
   private boolean onReset = false;
 
-  public static enum ReadState {
-    WRITE_FAILURE,
-    END_OF_STREAM,
-    WRITE_SUCCEED
-  }
 
   public JsonReader() throws IOException {
     this(null, false);
@@ -86,23 +81,15 @@ public class JsonReader {
   }
 
   public JsonReader(DrillBuf managedBuf, List<SchemaPath> columns, boolean allTextMode)
{
-    BufferRecycler recycler = new BufferRecycler();
-    IOContext context = new IOContext(recycler, this, false);
-    final int features = JsonParser.Feature.collectDefaults() //
-        | Feature.ALLOW_COMMENTS.getMask() //
-        | Feature.ALLOW_UNQUOTED_FIELD_NAMES.getMask();
-
-    BytesToNameCanonicalizer can = BytesToNameCanonicalizer.createRoot();
-    parser = new RewindableUtf8Reader<>(context, features, can.makeChild(JsonFactory.Feature.collectDefaults()),
context.allocReadIOBuffer());
+    super(managedBuf);
 
     assert Preconditions.checkNotNull(columns).size() > 0 : "json record reader requires
at least a column";
-
     this.selection = FieldSelection.getFieldSelection(columns);
-    this.workBuf = managedBuf;
     this.allTextMode = allTextMode;
     this.columns = columns;
   }
 
+  @Override
   public void ensureAtLeastOneField(ComplexWriter writer){
     if(!atLeastOneWrite){
       // if we had no columns, create one empty one so we can return some data for count
purposes.
@@ -117,6 +104,7 @@ public class JsonReader {
     }
   }
 
+  @Override
   public void setSource(InputStream is) throws IOException{
     parser.setInputStream(is);
     this.onReset = false;
@@ -136,6 +124,7 @@ public class JsonReader {
   }
 
 
+  @Override
   public ReadState write(ComplexWriter writer) throws IOException {
     JsonToken t = onReset ? parser.getCurrentToken() : parser.nextToken();
 


Mime
View raw message