Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B9EC018E62 for ; Thu, 26 Nov 2015 04:03:57 +0000 (UTC) Received: (qmail 85341 invoked by uid 500); 26 Nov 2015 04:03:57 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 85310 invoked by uid 500); 26 Nov 2015 04:03:57 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 85282 invoked by uid 99); 26 Nov 2015 04:03:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Nov 2015 04:03:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 441F4E17D2; Thu, 26 Nov 2015 04:03:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: json@apache.org To: commits@drill.apache.org Date: Thu, 26 Nov 2015 04:03:59 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/4] drill git commit: DRILL-4056: Fix corruption bug reading string data out of Avro DRILL-4056: Fix corruption bug reading string data out of Avro - Fix issue where we are reading a byte array without considering length - Removed use of unnecessary Holder objects. - Added restriction on batch size produced by a single call to next. - Add some basic result verification to avro tests. This closes #266 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5cc3448b Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5cc3448b Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5cc3448b Branch: refs/heads/master Commit: 5cc3448bc7b911325377a966160f1a21ce614468 Parents: 18a0eff Author: Jason Altekruse Authored: Fri Nov 13 15:46:58 2015 -0800 Committer: Jason Altekruse Committed: Wed Nov 25 15:20:35 2015 -0800 ---------------------------------------------------------------------- .../drill/exec/store/avro/AvroRecordReader.java | 78 ++++---------- .../impersonation/TestImpersonationQueries.java | 2 +- .../drill/exec/store/avro/AvroFormatTest.java | 39 ++++++- .../drill/exec/store/avro/AvroTestUtil.java | 103 +++++++++++++++++-- 4 files changed, 147 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/5cc3448b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java index c12ff1a..c9cd505 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java @@ -41,13 +41,6 @@ import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.expr.holders.BigIntHolder; -import org.apache.drill.exec.expr.holders.BitHolder; -import org.apache.drill.exec.expr.holders.Float4Holder; -import org.apache.drill.exec.expr.holders.Float8Holder; -import org.apache.drill.exec.expr.holders.IntHolder; -import org.apache.drill.exec.expr.holders.VarBinaryHolder; -import org.apache.drill.exec.expr.holders.VarCharHolder; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; @@ -84,7 +77,7 @@ public class AvroRecordReader extends AbstractRecordReader { private final String opUserName; private final String queryUserName; - private static final int DEFAULT_BATCH_SIZE = 1000; + private static final int DEFAULT_BATCH_SIZE = 4096; public AvroRecordReader(final FragmentContext fragmentContext, @@ -94,18 +87,6 @@ public class AvroRecordReader extends AbstractRecordReader { final FileSystem fileSystem, final List projectedColumns, final String userName) { - this(fragmentContext, inputPath, start, length, fileSystem, projectedColumns, userName, DEFAULT_BATCH_SIZE); - } - - public AvroRecordReader(final FragmentContext fragmentContext, - final String inputPath, - final long start, - final long length, - final FileSystem fileSystem, - List projectedColumns, - final String userName, - final int defaultBatchSize) { - hadoop = new Path(inputPath); this.start = start; this.end = start + length; @@ -161,10 +142,9 @@ public class AvroRecordReader extends AbstractRecordReader { writer.reset(); try { - - // XXX - Implement batch size - - for (GenericContainer container = null; reader.hasNext() && !reader.pastSync(end); recordCount++) { + for (GenericContainer container = null; + recordCount < DEFAULT_BATCH_SIZE && reader.hasNext() && !reader.pastSync(end); + recordCount++) { writer.setPosition(recordCount); container = reader.next(container); processRecord(container, container.getSchema()); @@ -284,56 +264,39 @@ public class AvroRecordReader extends AbstractRecordReader { switch (type) { case STRING: byte[] binary = null; + final int length; if (value instanceof Utf8) { binary = ((Utf8) value).getBytes(); + length = ((Utf8) value).getByteLength(); } else { binary = value.toString().getBytes(Charsets.UTF_8); + length = binary.length; } - final int length = binary.length; - final VarCharHolder vh = new VarCharHolder(); ensure(length); buffer.setBytes(0, binary); - vh.buffer = buffer; - vh.start = 0; - vh.end = length; - writer.varChar(fieldName).write(vh); + writer.varChar(fieldName).writeVarChar(0, length, buffer); break; case INT: - final IntHolder ih = new IntHolder(); - ih.value = (Integer) value; - writer.integer(fieldName).write(ih); + writer.integer(fieldName).writeInt((Integer) value); break; case LONG: - final BigIntHolder bh = new BigIntHolder(); - bh.value = (Long) value; - writer.bigInt(fieldName).write(bh); + writer.bigInt(fieldName).writeBigInt((Long) value); break; case FLOAT: - final Float4Holder fh = new Float4Holder(); - fh.value = (Float) value; - writer.float4(fieldName).write(fh); + writer.float4(fieldName).writeFloat4((Float) value); break; case DOUBLE: - final Float8Holder f8h = new Float8Holder(); - f8h.value = (Double) value; - writer.float8(fieldName).write(f8h); + writer.float8(fieldName).writeFloat8((Double) value); break; case BOOLEAN: - final BitHolder bit = new BitHolder(); - bit.value = (Boolean) value ? 1 : 0; - writer.bit(fieldName).write(bit); + writer.bit(fieldName).writeBit((Boolean) value ? 1 : 0); break; case BYTES: - // XXX - Not sure if this is correct. Nothing prints from sqlline for byte fields. - final VarBinaryHolder vb = new VarBinaryHolder(); final ByteBuffer buf = (ByteBuffer) value; - final byte[] bytes = buf.array(); - ensure(bytes.length); - buffer.setBytes(0, bytes); - vb.buffer = buffer; - vb.start = 0; - vb.end = bytes.length; - writer.binary(fieldName).write(vb); + length = buf.remaining(); + ensure(length); + buffer.setBytes(0, buf); + writer.binary(fieldName).writeVarBinary(0, length, buffer); break; case NULL: // Nothing to do for null type @@ -346,13 +309,10 @@ public class AvroRecordReader extends AbstractRecordReader { } catch (UnsupportedEncodingException e) { throw new DrillRuntimeException("Unable to read enum value for field: " + fieldName, e); } - final VarCharHolder vch = new VarCharHolder(); ensure(b.length); buffer.setBytes(0, b); - vch.buffer = buffer; - vch.start = 0; - vch.end = b.length; - writer.varChar(fieldName).write(vch); + writer.varChar(fieldName).writeVarChar(0, b.length, buffer); + break; default: throw new DrillRuntimeException("Unhandled Avro type: " + type.toString()); http://git-wip-us.apache.org/repos/asf/drill/blob/5cc3448b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java index 6709e43..1c8d7ba 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java @@ -164,7 +164,7 @@ public class TestImpersonationQueries extends BaseTestImpersonation { fs.setOwner(dfsFile, user, group); fs.setPermission(dfsFile, new FsPermission((short) 0700)); - localFile = new Path(AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues()); + localFile = new Path(AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues().getFilePath()); dfsFile = new Path(getUserHome(user), "simple.avro"); fs.copyFromLocalFile(localFile, dfsFile); fs.setOwner(dfsFile, user, group); http://git-wip-us.apache.org/repos/asf/drill/blob/5cc3448b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java index 7ef0a50..d718342 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java @@ -33,27 +33,56 @@ public class AvroFormatTest extends BaseTestQuery { // 2. Avro supports recursive types? Can we test this? @Test + public void testBatchCutoff() throws Exception { + + final AvroTestUtil.AvroTestRecordWriter testSetup = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues(5000); + final String file = testSetup.getFilePath(); + final String sql = + "select a_string, b_int, c_long, d_float, e_double, f_bytes, h_boolean, g_null " + + "from dfs_test.`" + file + "`"; + test(sql); + testBuilder() + .sqlQuery(sql) + .unOrdered() + .expectsNumBatches(2) + .baselineRecords(testSetup.getExpectedRecords()) + .go(); + } + + @Test public void testSimplePrimitiveSchema_NoNullValues() throws Exception { - final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues(); + final AvroTestUtil.AvroTestRecordWriter testSetup = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues(); + final String file = testSetup.getFilePath(); final String sql = "select a_string, b_int, c_long, d_float, e_double, f_bytes, h_boolean, g_null " + "from dfs_test.`" + file + "`"; test(sql); + testBuilder() + .sqlQuery(sql) + .unOrdered() + .baselineRecords(testSetup.getExpectedRecords()) + .go(); } @Test public void testSimplePrimitiveSchema_StarQuery() throws Exception { - final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues(); + final AvroTestUtil.AvroTestRecordWriter testSetup = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues(); + final String file = testSetup.getFilePath(); final String sql = "select * from dfs_test.`" + file + "`"; test(sql); + testBuilder() + .sqlQuery(sql) + .unOrdered() + .baselineRecords(testSetup.getExpectedRecords()) + .go(); } @Test public void testSimplePrimitiveSchema_SelectColumnSubset() throws Exception { - final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues(); + final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues().getFilePath(); final String sql = "select h_boolean, e_double from dfs_test.`" + file + "`"; test(sql); } @@ -61,7 +90,7 @@ public class AvroFormatTest extends BaseTestQuery { @Test public void testSimplePrimitiveSchema_NoColumnsExistInTheSchema() throws Exception { - final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues(); + final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues().getFilePath(); final String sql = "select h_dummy1, e_dummy2 from dfs_test.`" + file + "`"; try { test(sql); @@ -75,7 +104,7 @@ public class AvroFormatTest extends BaseTestQuery { @Test public void testSimplePrimitiveSchema_OneExistAndOneDoesNotExistInTheSchema() throws Exception { - final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues(); + final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues().getFilePath(); final String sql = "select h_boolean, e_dummy2 from dfs_test.`" + file + "`"; try { test(sql); http://git-wip-us.apache.org/repos/asf/drill/blob/5cc3448b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java index be35268..053b5cc 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.avro; +import com.google.common.base.Charsets; import org.apache.avro.Schema; import org.apache.avro.Schema.Type; import org.apache.avro.SchemaBuilder; @@ -26,9 +27,13 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; +import java.io.Closeable; import java.io.File; +import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -38,7 +43,86 @@ public class AvroTestUtil { public static final int RECORD_COUNT = 10; - public static String generateSimplePrimitiveSchema_NoNullValues() throws Exception { + public static class AvroTestSetup { + private String filePath; + private List expectedRecords; + + public AvroTestSetup(String filePath, List expectedRecords) { + this.filePath = filePath; + this.expectedRecords = expectedRecords; + } + } + + /** + * Class to write records to an Avro file while simultaneously + * constructing a corresponding list of records in the format taken in + * by the Drill test builder to describe expected results. + */ + public static class AvroTestRecordWriter implements Closeable { + private final List expectedRecords; + GenericData.Record currentAvroRecord; + Map currentExpectedRecord; + private Schema schema; + private final DataFileWriter writer; + private final String filePath; + + private AvroTestRecordWriter(Schema schema, File file) { + writer = new DataFileWriter(new GenericDatumWriter(schema)); + try { + writer.create(schema, file); + } catch (IOException e) { + throw new RuntimeException("Error creating file in Avro test setup.", e); + } + this.schema = schema; + currentExpectedRecord = new HashMap<>(); + expectedRecords = new ArrayList<>(); + filePath = file.getAbsolutePath(); + } + + public void startRecord() { + currentAvroRecord = new GenericData.Record(schema); + currentExpectedRecord = new HashMap<>(); + } + + public void put(String key, Object value) { + currentAvroRecord.put(key, value); + // convert binary values into byte[], the format they will be given + // in the Drill result set in the test framework + if (value instanceof ByteBuffer) { + ByteBuffer bb = ((ByteBuffer)value); + byte[] drillVal = new byte[((ByteBuffer)value).remaining()]; + bb.get(drillVal); + bb.position(0); + value = new String(drillVal, Charsets.UTF_8); + } + currentExpectedRecord.put("`" + key + "`", value); + } + + public void endRecord() throws IOException { + writer.append(currentAvroRecord); + expectedRecords.add(currentExpectedRecord); + } + + @Override + public void close() throws IOException { + writer.close(); + } + + public String getFilePath() { + return filePath; + } + + public ListgetExpectedRecords() { + return expectedRecords; + } + } + + + public static AvroTestRecordWriter generateSimplePrimitiveSchema_NoNullValues() throws Exception { + return generateSimplePrimitiveSchema_NoNullValues(RECORD_COUNT); + } + + public static AvroTestRecordWriter generateSimplePrimitiveSchema_NoNullValues(int numRecords) throws Exception { final Schema schema = SchemaBuilder.record("AvroRecordReaderTest") .namespace("org.apache.drill.exec.store.avro") @@ -56,15 +140,14 @@ public class AvroTestUtil { final File file = File.createTempFile("avro-primitive-test", ".avro"); file.deleteOnExit(); - final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema)); + final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file); try { - writer.create(schema, file); - ByteBuffer bb = ByteBuffer.allocate(1); - bb.put(0, (byte) 1); + bb.put(0, (byte) 'a'); - for (int i = 0; i < RECORD_COUNT; i++) { - final GenericRecord record = new GenericData.Record(schema); + for (int i = 0; i < numRecords; i++) { + bb.position(0); + record.startRecord(); record.put("a_string", "a_" + i); record.put("b_int", i); record.put("c_long", (long) i); @@ -73,13 +156,13 @@ public class AvroTestUtil { record.put("f_bytes", bb); record.put("g_null", null); record.put("h_boolean", (i % 2 == 0)); - writer.append(record); + record.endRecord(); } } finally { - writer.close(); + record.close(); } - return file.getAbsolutePath(); + return record; } public static String generateUnionSchema_WithNullValues() throws Exception {