Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 86F84104A3 for ; Thu, 18 Dec 2014 21:23:52 +0000 (UTC) Received: (qmail 37778 invoked by uid 500); 18 Dec 2014 21:23:52 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 37716 invoked by uid 500); 18 Dec 2014 21:23:52 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 37649 invoked by uid 99); 18 Dec 2014 21:23:52 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Dec 2014 21:23:52 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 0EB3B9CA490; Thu, 18 Dec 2014 21:23:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: parthc@apache.org To: commits@drill.apache.org Date: Thu, 18 Dec 2014 21:23:54 -0000 Message-Id: <1ef91d0fa76242fdb1108080dc1cd100@git.apache.org> In-Reply-To: <2af7bb69e90e444eb498e139e2868a1b@git.apache.org> References: <2af7bb69e90e444eb498e139e2868a1b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/5] drill git commit: DRILL-1871: Compressed JSON read support. DRILL-1871: Compressed JSON read support. Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e10c2197 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e10c2197 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e10c2197 Branch: refs/heads/master Commit: e10c2197b3884c9e6879462e0e1057aacadfa47f Parents: b83be2e Author: Jason Altekruse Authored: Tue Dec 16 13:57:39 2014 -0800 Committer: Parth Chandra Committed: Wed Dec 17 22:27:28 2014 -0800 ---------------------------------------------------------------------- .../exec/store/easy/json/JSONFormatPlugin.java | 4 +- .../exec/store/easy/json/JSONRecordReader.java | 15 +++++- .../exec/vector/complex/fn/JsonReader.java | 3 +- .../vector/complex/writer/TestJsonReader.java | 54 ++++++++++++++++++++ 4 files changed, 72 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/e10c2197/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java index 3b5c6f1..d41243d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java @@ -45,12 +45,14 @@ import com.google.common.collect.Maps; public class JSONFormatPlugin extends EasyFormatPlugin { + private static final boolean IS_COMPRESSIBLE = true; + public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig) { this(name, context, fs, storageConfig, new JSONFormatConfig()); } public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, JSONFormatConfig formatPluginConfig) { - super(name, context, fs, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("json"), "json"); + super(name, context, fs, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE, Lists.newArrayList("json"), "json"); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/e10c2197/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java index 2582e34..0070d18 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.easy.json; import java.io.IOException; +import java.io.InputStream; import java.util.List; import java.util.concurrent.TimeUnit; @@ -36,12 +37,16 @@ import org.apache.drill.exec.vector.complex.fn.JsonReader; import org.apache.drill.exec.vector.complex.fn.JsonReader.ReadState; import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import com.fasterxml.jackson.core.JsonParseException; import com.google.common.base.Stopwatch; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.CompressionInputStream; public class JSONRecordReader extends AbstractRecordReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class); @@ -50,7 +55,7 @@ public class JSONRecordReader extends AbstractRecordReader { private VectorContainerWriter writer; private Path hadoopPath; private FileSystem fileSystem; - private FSDataInputStream stream; + private InputStream stream; private JsonReader jsonReader; private int recordCount; private FragmentContext fragmentContext; @@ -70,7 +75,13 @@ public class JSONRecordReader extends AbstractRecordReader { @Override public void setup(OutputMutator output) throws ExecutionSetupException { try{ - this.stream = fileSystem.open(hadoopPath); + CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration()); + CompressionCodec codec = factory.getCodec(hadoopPath); // infers from file ext. + if (codec != null) { + this.stream = codec.createInputStream(fileSystem.open(hadoopPath)); + } else { + this.stream = fileSystem.open(hadoopPath); + } this.writer = new VectorContainerWriter(output); this.mutator = output; this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), columns, enableAllTextMode); http://git-wip-us.apache.org/repos/asf/drill/blob/e10c2197/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java index e209917..cc5c8af 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java @@ -51,6 +51,7 @@ import com.fasterxml.jackson.core.sym.BytesToNameCanonicalizer; import com.fasterxml.jackson.core.util.BufferRecycler; import com.google.common.base.Charsets; import com.google.common.base.Preconditions; +import org.apache.hadoop.io.compress.CompressionInputStream; public class JsonReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonReader.class); @@ -116,7 +117,7 @@ public class JsonReader { } } - public void setSource(FSDataInputStream is) throws IOException{ + public void setSource(InputStream is) throws IOException{ parser.setInputStream(is); this.onReset = false; } http://git-wip-us.apache.org/repos/asf/drill/blob/e10c2197/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java index 4a460a4..fe86192 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java @@ -21,7 +21,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintWriter; import java.util.List; +import java.util.zip.GZIPOutputStream; import org.apache.drill.BaseTestQuery; import org.apache.drill.common.expression.SchemaPath; @@ -34,16 +40,20 @@ import org.apache.drill.exec.rpc.user.QueryResultBatch; import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.RepeatedBigIntVector; import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; import com.google.common.base.Charsets; import com.google.common.io.Files; +import org.junit.rules.TemporaryFolder; public class TestJsonReader extends BaseTestQuery { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestJsonReader.class); private static final boolean VERBOSE_DEBUG = false; + @Rule + public TemporaryFolder folder = new TemporaryFolder(); @Test public void schemaChange() throws Exception { @@ -88,6 +98,50 @@ public class TestJsonReader extends BaseTestQuery { } @Test + public void testReadCompressed() throws Exception { + String filepath = "compressed_json.json"; + File f = folder.newFile(filepath); + PrintWriter out = new PrintWriter(f); + out.println("{\"a\" :5}"); + out.close(); + + gzipIt(f); + testBuilder() + .sqlQuery("select * from dfs.`" + f.getPath() + ".gz" + "`") + .unOrdered() + .baselineColumns("a") + .baselineValues(5l) + .build().run(); + + // test reading the uncompressed version as well + testBuilder() + .sqlQuery("select * from dfs.`" + f.getPath() + "`") + .unOrdered() + .baselineColumns("a") + .baselineValues(5l) + .build().run(); + } + + public void gzipIt(File sourceFile) throws IOException { + + // modified from: http://www.mkyong.com/java/how-to-compress-a-file-in-gzip-format/ + byte[] buffer = new byte[1024]; + GZIPOutputStream gzos = + new GZIPOutputStream(new FileOutputStream(sourceFile.getPath() + ".gz")); + + FileInputStream in = + new FileInputStream(sourceFile); + + int len; + while ((len = in.read(buffer)) > 0) { + gzos.write(buffer, 0, len); + } + in.close(); + gzos.finish(); + gzos.close(); + } + + @Test public void testDrill_1419() throws Exception { String[] queries = {"select t.trans_id, t.trans_info.prod_id[0],t.trans_info.prod_id[1] from cp.`/store/json/clicks.json` t limit 5"}; long[] rowCounts = {5};