drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From par...@apache.org
Subject [4/5] drill git commit: DRILL-1871: Compressed JSON read support.
Date Thu, 18 Dec 2014 21:23:54 GMT
DRILL-1871: Compressed JSON read support.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e10c2197
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e10c2197
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e10c2197

Branch: refs/heads/master
Commit: e10c2197b3884c9e6879462e0e1057aacadfa47f
Parents: b83be2e
Author: Jason Altekruse <altekrusejason@gmail.com>
Authored: Tue Dec 16 13:57:39 2014 -0800
Committer: Parth Chandra <pchandra@maprtech.com>
Committed: Wed Dec 17 22:27:28 2014 -0800

----------------------------------------------------------------------
 .../exec/store/easy/json/JSONFormatPlugin.java  |  4 +-
 .../exec/store/easy/json/JSONRecordReader.java  | 15 +++++-
 .../exec/vector/complex/fn/JsonReader.java      |  3 +-
 .../vector/complex/writer/TestJsonReader.java   | 54 ++++++++++++++++++++
 4 files changed, 72 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/e10c2197/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 3b5c6f1..d41243d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -45,12 +45,14 @@ import com.google.common.collect.Maps;
 
 public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
 
+  private static final boolean IS_COMPRESSIBLE = true;
+
   public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig
storageConfig) {
     this(name, context, fs, storageConfig, new JSONFormatConfig());
   }
 
   public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig
config, JSONFormatConfig formatPluginConfig) {
-    super(name, context, fs, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("json"),
"json");
+    super(name, context, fs, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE,
Lists.newArrayList("json"), "json");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/e10c2197/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 2582e34..0070d18 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.easy.json;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -36,12 +37,16 @@ import org.apache.drill.exec.vector.complex.fn.JsonReader;
 import org.apache.drill.exec.vector.complex.fn.JsonReader.ReadState;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.fasterxml.jackson.core.JsonParseException;
 import com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
 
 public class JSONRecordReader extends AbstractRecordReader {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
@@ -50,7 +55,7 @@ public class JSONRecordReader extends AbstractRecordReader {
   private VectorContainerWriter writer;
   private Path hadoopPath;
   private FileSystem fileSystem;
-  private FSDataInputStream stream;
+  private InputStream stream;
   private JsonReader jsonReader;
   private int recordCount;
   private FragmentContext fragmentContext;
@@ -70,7 +75,13 @@ public class JSONRecordReader extends AbstractRecordReader {
   @Override
   public void setup(OutputMutator output) throws ExecutionSetupException {
     try{
-      this.stream = fileSystem.open(hadoopPath);
+      CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
+      CompressionCodec codec = factory.getCodec(hadoopPath); // infers from file ext.
+      if (codec != null) {
+        this.stream = codec.createInputStream(fileSystem.open(hadoopPath));
+      } else {
+        this.stream = fileSystem.open(hadoopPath);
+      }
       this.writer = new VectorContainerWriter(output);
       this.mutator = output;
       this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), columns, enableAllTextMode);

http://git-wip-us.apache.org/repos/asf/drill/blob/e10c2197/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index e209917..cc5c8af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -51,6 +51,7 @@ import com.fasterxml.jackson.core.sym.BytesToNameCanonicalizer;
 import com.fasterxml.jackson.core.util.BufferRecycler;
 import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.io.compress.CompressionInputStream;
 
 public class JsonReader {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonReader.class);
@@ -116,7 +117,7 @@ public class JsonReader {
     }
   }
 
-  public void setSource(FSDataInputStream is) throws IOException{
+  public void setSource(InputStream is) throws IOException{
     parser.setInputStream(is);
     this.onReset = false;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/e10c2197/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
index 4a460a4..fe86192 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
@@ -21,7 +21,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
 import java.util.List;
+import java.util.zip.GZIPOutputStream;
 
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.common.expression.SchemaPath;
@@ -34,16 +40,20 @@ import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.RepeatedBigIntVector;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
+import org.junit.rules.TemporaryFolder;
 
 public class TestJsonReader extends BaseTestQuery {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestJsonReader.class);
 
   private static final boolean VERBOSE_DEBUG = false;
 
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
 
   @Test
   public void schemaChange() throws Exception {
@@ -88,6 +98,50 @@ public class TestJsonReader extends BaseTestQuery {
   }
 
   @Test
+  public void testReadCompressed() throws Exception {
+    String filepath = "compressed_json.json";
+    File f = folder.newFile(filepath);
+    PrintWriter out = new PrintWriter(f);
+    out.println("{\"a\" :5}");
+    out.close();
+
+    gzipIt(f);
+    testBuilder()
+        .sqlQuery("select * from dfs.`" + f.getPath() + ".gz" + "`")
+        .unOrdered()
+        .baselineColumns("a")
+        .baselineValues(5l)
+        .build().run();
+
+    // test reading the uncompressed version as well
+    testBuilder()
+        .sqlQuery("select * from dfs.`" + f.getPath() + "`")
+        .unOrdered()
+        .baselineColumns("a")
+        .baselineValues(5l)
+        .build().run();
+  }
+
+  public void gzipIt(File sourceFile) throws IOException {
+
+    // modified from: http://www.mkyong.com/java/how-to-compress-a-file-in-gzip-format/
+    byte[] buffer = new byte[1024];
+    GZIPOutputStream gzos =
+        new GZIPOutputStream(new FileOutputStream(sourceFile.getPath() + ".gz"));
+
+    FileInputStream in =
+        new FileInputStream(sourceFile);
+
+    int len;
+    while ((len = in.read(buffer)) > 0) {
+      gzos.write(buffer, 0, len);
+    }
+    in.close();
+    gzos.finish();
+    gzos.close();
+  }
+
+  @Test
   public void testDrill_1419() throws Exception {
     String[] queries = {"select t.trans_id, t.trans_info.prod_id[0],t.trans_info.prod_id[1]
from cp.`/store/json/clicks.json` t limit 5"};
     long[] rowCounts = {5};


Mime
View raw message