drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [3/4] drill git commit: DRILL-4056: Fix corruption bug reading string data out of Avro
Date Thu, 26 Nov 2015 04:03:59 GMT
DRILL-4056: Fix corruption bug reading string data out of Avro

- Fix issue where we are reading a byte array without considering length
- Removed use of unnecessary Holder objects.
- Added restriction on batch size produced by a single call to next.
- Add some basic result verification to avro tests.

This closes #266


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5cc3448b
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5cc3448b
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5cc3448b

Branch: refs/heads/master
Commit: 5cc3448bc7b911325377a966160f1a21ce614468
Parents: 18a0eff
Author: Jason Altekruse <altekrusejason@gmail.com>
Authored: Fri Nov 13 15:46:58 2015 -0800
Committer: Jason Altekruse <altekrusejason@gmail.com>
Committed: Wed Nov 25 15:20:35 2015 -0800

----------------------------------------------------------------------
 .../drill/exec/store/avro/AvroRecordReader.java |  78 ++++----------
 .../impersonation/TestImpersonationQueries.java |   2 +-
 .../drill/exec/store/avro/AvroFormatTest.java   |  39 ++++++-
 .../drill/exec/store/avro/AvroTestUtil.java     | 103 +++++++++++++++++--
 4 files changed, 147 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/5cc3448b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
index c12ff1a..c9cd505 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -41,13 +41,6 @@ import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.expr.holders.BigIntHolder;
-import org.apache.drill.exec.expr.holders.BitHolder;
-import org.apache.drill.exec.expr.holders.Float4Holder;
-import org.apache.drill.exec.expr.holders.Float8Holder;
-import org.apache.drill.exec.expr.holders.IntHolder;
-import org.apache.drill.exec.expr.holders.VarBinaryHolder;
-import org.apache.drill.exec.expr.holders.VarCharHolder;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
@@ -84,7 +77,7 @@ public class AvroRecordReader extends AbstractRecordReader {
   private final String opUserName;
   private final String queryUserName;
 
-  private static final int DEFAULT_BATCH_SIZE = 1000;
+  private static final int DEFAULT_BATCH_SIZE = 4096;
 
 
   public AvroRecordReader(final FragmentContext fragmentContext,
@@ -94,18 +87,6 @@ public class AvroRecordReader extends AbstractRecordReader {
                           final FileSystem fileSystem,
                           final List<SchemaPath> projectedColumns,
                           final String userName) {
-    this(fragmentContext, inputPath, start, length, fileSystem, projectedColumns, userName,
DEFAULT_BATCH_SIZE);
-  }
-
-  public AvroRecordReader(final FragmentContext fragmentContext,
-                          final String inputPath,
-                          final long start,
-                          final long length,
-                          final FileSystem fileSystem,
-                          List<SchemaPath> projectedColumns,
-                          final String userName,
-                          final int defaultBatchSize) {
-
     hadoop = new Path(inputPath);
     this.start = start;
     this.end = start + length;
@@ -161,10 +142,9 @@ public class AvroRecordReader extends AbstractRecordReader {
     writer.reset();
 
     try {
-
-      // XXX - Implement batch size
-
-      for (GenericContainer container = null; reader.hasNext() && !reader.pastSync(end);
recordCount++) {
+      for (GenericContainer container = null;
+           recordCount < DEFAULT_BATCH_SIZE && reader.hasNext() && !reader.pastSync(end);
+           recordCount++) {
         writer.setPosition(recordCount);
         container = reader.next(container);
         processRecord(container, container.getSchema());
@@ -284,56 +264,39 @@ public class AvroRecordReader extends AbstractRecordReader {
     switch (type) {
       case STRING:
         byte[] binary = null;
+        final int length;
         if (value instanceof Utf8) {
           binary = ((Utf8) value).getBytes();
+          length = ((Utf8) value).getByteLength();
         } else {
           binary = value.toString().getBytes(Charsets.UTF_8);
+          length = binary.length;
         }
-        final int length = binary.length;
-        final VarCharHolder vh = new VarCharHolder();
         ensure(length);
         buffer.setBytes(0, binary);
-        vh.buffer = buffer;
-        vh.start = 0;
-        vh.end = length;
-        writer.varChar(fieldName).write(vh);
+        writer.varChar(fieldName).writeVarChar(0, length, buffer);
         break;
       case INT:
-        final IntHolder ih = new IntHolder();
-        ih.value = (Integer) value;
-        writer.integer(fieldName).write(ih);
+        writer.integer(fieldName).writeInt((Integer) value);
         break;
       case LONG:
-        final BigIntHolder bh = new BigIntHolder();
-        bh.value = (Long) value;
-        writer.bigInt(fieldName).write(bh);
+        writer.bigInt(fieldName).writeBigInt((Long) value);
         break;
       case FLOAT:
-        final Float4Holder fh = new Float4Holder();
-        fh.value = (Float) value;
-        writer.float4(fieldName).write(fh);
+        writer.float4(fieldName).writeFloat4((Float) value);
         break;
       case DOUBLE:
-        final Float8Holder f8h = new Float8Holder();
-        f8h.value = (Double) value;
-        writer.float8(fieldName).write(f8h);
+        writer.float8(fieldName).writeFloat8((Double) value);
         break;
       case BOOLEAN:
-        final BitHolder bit = new BitHolder();
-        bit.value = (Boolean) value ? 1 : 0;
-        writer.bit(fieldName).write(bit);
+        writer.bit(fieldName).writeBit((Boolean) value ? 1 : 0);
         break;
       case BYTES:
-        // XXX - Not sure if this is correct. Nothing prints from sqlline for byte fields.
-        final VarBinaryHolder vb = new VarBinaryHolder();
         final ByteBuffer buf = (ByteBuffer) value;
-        final byte[] bytes = buf.array();
-        ensure(bytes.length);
-        buffer.setBytes(0, bytes);
-        vb.buffer = buffer;
-        vb.start = 0;
-        vb.end = bytes.length;
-        writer.binary(fieldName).write(vb);
+        length = buf.remaining();
+        ensure(length);
+        buffer.setBytes(0, buf);
+        writer.binary(fieldName).writeVarBinary(0, length, buffer);
         break;
       case NULL:
         // Nothing to do for null type
@@ -346,13 +309,10 @@ public class AvroRecordReader extends AbstractRecordReader {
         } catch (UnsupportedEncodingException e) {
           throw new DrillRuntimeException("Unable to read enum value for field: " + fieldName,
e);
         }
-        final VarCharHolder vch = new VarCharHolder();
         ensure(b.length);
         buffer.setBytes(0, b);
-        vch.buffer = buffer;
-        vch.start = 0;
-        vch.end = b.length;
-        writer.varChar(fieldName).write(vch);
+        writer.varChar(fieldName).writeVarChar(0, b.length, buffer);
+
         break;
       default:
         throw new DrillRuntimeException("Unhandled Avro type: " + type.toString());

http://git-wip-us.apache.org/repos/asf/drill/blob/5cc3448b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
index 6709e43..1c8d7ba 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
@@ -164,7 +164,7 @@ public class TestImpersonationQueries extends BaseTestImpersonation {
     fs.setOwner(dfsFile, user, group);
     fs.setPermission(dfsFile, new FsPermission((short) 0700));
 
-    localFile = new Path(AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues());
+    localFile = new Path(AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues().getFilePath());
     dfsFile = new Path(getUserHome(user), "simple.avro");
     fs.copyFromLocalFile(localFile, dfsFile);
     fs.setOwner(dfsFile, user, group);

http://git-wip-us.apache.org/repos/asf/drill/blob/5cc3448b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
index 7ef0a50..d718342 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
@@ -33,27 +33,56 @@ public class AvroFormatTest extends BaseTestQuery {
   //      2. Avro supports recursive types? Can we test this?
 
   @Test
+  public void testBatchCutoff() throws Exception {
+
+    final AvroTestUtil.AvroTestRecordWriter testSetup = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues(5000);
+    final String file = testSetup.getFilePath();
+    final String sql =
+        "select a_string, b_int, c_long, d_float, e_double, f_bytes, h_boolean, g_null "
+
+            "from dfs_test.`" + file + "`";
+    test(sql);
+    testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .expectsNumBatches(2)
+        .baselineRecords(testSetup.getExpectedRecords())
+        .go();
+  }
+
+  @Test
   public void testSimplePrimitiveSchema_NoNullValues() throws Exception {
 
-    final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
+    final AvroTestUtil.AvroTestRecordWriter testSetup = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
+    final String file = testSetup.getFilePath();
     final String sql =
             "select a_string, b_int, c_long, d_float, e_double, f_bytes, h_boolean, g_null
" +
              "from dfs_test.`" + file + "`";
     test(sql);
+    testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineRecords(testSetup.getExpectedRecords())
+        .go();
   }
 
   @Test
   public void testSimplePrimitiveSchema_StarQuery() throws Exception {
 
-    final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
+    final AvroTestUtil.AvroTestRecordWriter testSetup = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
+    final String file = testSetup.getFilePath();
     final String sql = "select * from dfs_test.`" + file + "`";
     test(sql);
+    testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineRecords(testSetup.getExpectedRecords())
+        .go();
   }
 
   @Test
   public void testSimplePrimitiveSchema_SelectColumnSubset() throws Exception {
 
-    final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
+    final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues().getFilePath();
     final String sql = "select h_boolean, e_double from dfs_test.`" + file + "`";
     test(sql);
   }
@@ -61,7 +90,7 @@ public class AvroFormatTest extends BaseTestQuery {
   @Test
   public void testSimplePrimitiveSchema_NoColumnsExistInTheSchema() throws Exception {
 
-    final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
+    final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues().getFilePath();
     final String sql = "select h_dummy1, e_dummy2 from dfs_test.`" + file + "`";
     try {
       test(sql);
@@ -75,7 +104,7 @@ public class AvroFormatTest extends BaseTestQuery {
   @Test
   public void testSimplePrimitiveSchema_OneExistAndOneDoesNotExistInTheSchema() throws Exception
{
 
-    final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues();
+    final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues().getFilePath();
     final String sql = "select h_boolean, e_dummy2 from dfs_test.`" + file + "`";
     try {
       test(sql);

http://git-wip-us.apache.org/repos/asf/drill/blob/5cc3448b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
index be35268..053b5cc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.avro;
 
+import com.google.common.base.Charsets;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.SchemaBuilder;
@@ -26,9 +27,13 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 
+import java.io.Closeable;
 import java.io.File;
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -38,7 +43,86 @@ public class AvroTestUtil {
 
   public static final int RECORD_COUNT = 10;
 
-  public static String generateSimplePrimitiveSchema_NoNullValues() throws Exception {
+  public static class AvroTestSetup {
+    private String filePath;
+    private List<Map> expectedRecords;
+
+    public AvroTestSetup(String filePath, List<Map> expectedRecords) {
+      this.filePath = filePath;
+      this.expectedRecords = expectedRecords;
+    }
+  }
+
+  /**
+   * Class to write records to an Avro file while simultaneously
+   * constructing a corresponding list of records in the format taken in
+   * by the Drill test builder to describe expected results.
+   */
+  public static class AvroTestRecordWriter implements Closeable {
+    private final List<Map> expectedRecords;
+    GenericData.Record currentAvroRecord;
+    Map<String, Object> currentExpectedRecord;
+    private Schema schema;
+    private final DataFileWriter writer;
+    private final String filePath;
+
+    private AvroTestRecordWriter(Schema schema, File file) {
+      writer = new DataFileWriter(new GenericDatumWriter(schema));
+      try {
+        writer.create(schema, file);
+      } catch (IOException e) {
+        throw new RuntimeException("Error creating file in Avro test setup.", e);
+      }
+      this.schema = schema;
+      currentExpectedRecord = new HashMap<>();
+      expectedRecords = new ArrayList<>();
+      filePath = file.getAbsolutePath();
+    }
+
+    public void startRecord() {
+      currentAvroRecord = new GenericData.Record(schema);
+      currentExpectedRecord = new HashMap<>();
+    }
+
+    public void put(String key, Object value) {
+      currentAvroRecord.put(key, value);
+      // convert binary values into byte[], the format they will be given
+      // in the Drill result set in the test framework
+      if (value instanceof ByteBuffer) {
+        ByteBuffer bb = ((ByteBuffer)value);
+        byte[] drillVal = new byte[((ByteBuffer)value).remaining()];
+        bb.get(drillVal);
+        bb.position(0);
+        value = new String(drillVal, Charsets.UTF_8);
+      }
+      currentExpectedRecord.put("`" + key + "`", value);
+    }
+
+    public void endRecord() throws IOException {
+      writer.append(currentAvroRecord);
+      expectedRecords.add(currentExpectedRecord);
+    }
+
+    @Override
+    public void close() throws IOException {
+      writer.close();
+    }
+
+    public String getFilePath() {
+      return filePath;
+    }
+
+    public List<Map>getExpectedRecords() {
+      return expectedRecords;
+    }
+  }
+
+
+  public static AvroTestRecordWriter generateSimplePrimitiveSchema_NoNullValues() throws
Exception {
+    return generateSimplePrimitiveSchema_NoNullValues(RECORD_COUNT);
+  }
+
+  public static AvroTestRecordWriter generateSimplePrimitiveSchema_NoNullValues(int numRecords)
throws Exception {
 
     final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
             .namespace("org.apache.drill.exec.store.avro")
@@ -56,15 +140,14 @@ public class AvroTestUtil {
     final File file = File.createTempFile("avro-primitive-test", ".avro");
     file.deleteOnExit();
 
-    final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter(schema));
+    final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
     try {
-      writer.create(schema, file);
-
       ByteBuffer bb = ByteBuffer.allocate(1);
-      bb.put(0, (byte) 1);
+      bb.put(0, (byte) 'a');
 
-      for (int i = 0; i < RECORD_COUNT; i++) {
-        final GenericRecord record = new GenericData.Record(schema);
+      for (int i = 0; i < numRecords; i++) {
+        bb.position(0);
+        record.startRecord();
         record.put("a_string", "a_" + i);
         record.put("b_int", i);
         record.put("c_long", (long) i);
@@ -73,13 +156,13 @@ public class AvroTestUtil {
         record.put("f_bytes", bb);
         record.put("g_null", null);
         record.put("h_boolean", (i % 2 == 0));
-        writer.append(record);
+        record.endRecord();
       }
     } finally {
-      writer.close();
+      record.close();
     }
 
-    return file.getAbsolutePath();
+    return record;
   }
 
   public static String generateUnionSchema_WithNullValues() throws Exception {


Mime
View raw message