drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [10/12] drill git commit: DRILL-4241: Rewrite RecordReader to support NULLs and be less Java-like
Date Mon, 11 Jan 2016 07:53:58 GMT
DRILL-4241: Rewrite RecordReader to support NULLs and be less Java-like


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4ba51553
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4ba51553
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4ba51553

Branch: refs/heads/master
Commit: 4ba515539e1715e08c953d56d5648578d4a8ba29
Parents: 6a406aa
Author: Todd Lipcon <todd@cloudera.com>
Authored: Thu Nov 19 16:15:36 2015 -0800
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Sun Jan 10 22:54:21 2016 -0800

----------------------------------------------------------------------
 .../drill/exec/store/kudu/KuduRecordReader.java | 353 ++++++++-----------
 1 file changed, 152 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/4ba51553/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
index adbdb83..6b7877d 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
@@ -35,6 +35,11 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.kudu.KuduSubScan.KuduSubScanSpec;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.BitVector;
+import org.apache.drill.exec.vector.Float4Vector;
+import org.apache.drill.exec.vector.Float8Vector;
+import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.NullableBigIntVector;
 import org.apache.drill.exec.vector.NullableBitVector;
 import org.apache.drill.exec.vector.NullableFloat4Vector;
@@ -43,7 +48,10 @@ import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.NullableTimeStampVector;
 import org.apache.drill.exec.vector.NullableVarBinaryVector;
 import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.TimeStampVector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.drill.exec.vector.VarCharVector;
 import org.kududb.ColumnSchema;
 import org.kududb.Schema;
 import org.kududb.Type;
@@ -68,10 +76,15 @@ public class KuduRecordReader extends AbstractRecordReader {
   private KuduScanner scanner;
   private RowResultIterator iterator;
   
-  
   private OutputMutator output;
-  private ImmutableList<ValueVector> vectors;
-  private ImmutableList<Copier<?>> copiers;
+
+  private static class ProjectedColumnInfo {
+    int index;
+    ValueVector vv;
+    ColumnSchema kuduColumn;
+  }
+  private ImmutableList<ProjectedColumnInfo> projectedCols;
+
 
   public KuduRecordReader(KuduClient client, KuduSubScan.KuduSubScanSpec subScanSpec,
       List<SchemaPath> projectedColumns, FragmentContext context) {
@@ -104,20 +117,20 @@ public class KuduRecordReader extends AbstractRecordReader {
     }
   }
 
-  static final Map<Type, MajorType> TYPES;
+  static final Map<Type,MinorType> TYPES;
 
   static {
-    TYPES = ImmutableMap.<Type, MajorType> builder()
-        .put(Type.BINARY, Types.optional(MinorType.VARBINARY))
-        .put(Type.BOOL, Types.optional(MinorType.BIT))
-        .put(Type.DOUBLE, Types.optional(MinorType.FLOAT8))
-        .put(Type.FLOAT, Types.optional(MinorType.FLOAT4))
-        .put(Type.INT8, Types.optional(MinorType.TINYINT))
-        .put(Type.INT16, Types.optional(MinorType.SMALLINT))
-        .put(Type.INT32, Types.optional(MinorType.INT))
-        .put(Type.INT64, Types.optional(MinorType.BIGINT))
-        .put(Type.STRING, Types.optional(MinorType.VARCHAR))
-        .put(Type.TIMESTAMP, Types.optional(MinorType.TIMESTAMP))
+    TYPES = ImmutableMap.<Type, MinorType> builder()
+        .put(Type.BINARY, MinorType.VARBINARY)
+        .put(Type.BOOL, MinorType.BIT)
+        .put(Type.DOUBLE, MinorType.FLOAT8)
+        .put(Type.FLOAT, MinorType.FLOAT4)
+        .put(Type.INT8, MinorType.INT)
+        .put(Type.INT16, MinorType.INT)
+        .put(Type.INT32, MinorType.INT)
+        .put(Type.INT64, MinorType.BIGINT)
+        .put(Type.STRING, MinorType.VARCHAR)
+        .put(Type.TIMESTAMP, MinorType.TIMESTAMP)
         .build();
   }
 
@@ -138,24 +151,23 @@ public class KuduRecordReader extends AbstractRecordReader {
     } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
-    for (ValueVector vv : vectors) {
-      vv.getMutator().setValueCount(rowCount);
+    for (ProjectedColumnInfo pci : projectedCols) {
+      pci.vv.getMutator().setValueCount(rowCount);
     }
     return rowCount;
   }
   
   @SuppressWarnings("unchecked")
-  private void initCopiers(Schema schema) throws SchemaChangeException {
-    ImmutableList.Builder<ValueVector> vectorBuilder = ImmutableList.builder();
-    ImmutableList.Builder<Copier<?>> copierBuilder = ImmutableList.builder();
+  private void initCols(Schema schema) throws SchemaChangeException {
+    ImmutableList.Builder<ProjectedColumnInfo> pciBuilder = ImmutableList.builder();
     
     for (int i = 0; i < schema.getColumnCount(); i++) {
       ColumnSchema col = schema.getColumnByIndex(i);  
       
       final String name = col.getName();
       final Type kuduType = col.getType();
-      MajorType majorType = TYPES.get(kuduType);
-      if (majorType == null) {
+      MinorType minorType = TYPES.get(kuduType);
+      if (minorType == null) {
         logger.warn("Ignoring column that is unsupported.", UserException
             .unsupportedError()
             .message(
@@ -168,204 +180,143 @@ public class KuduRecordReader extends AbstractRecordReader {
 
         continue;
       }
-      MinorType minorType = majorType.getMinorType();
+      MajorType majorType;
+      if (col.isNullable()) {
+        majorType = Types.optional(minorType);
+      } else {
+        majorType = Types.required(minorType);
+      }
       MaterializedField field = MaterializedField.create(name, majorType);
       final Class<? extends ValueVector> clazz = (Class<? extends ValueVector>)
TypeHelper.getValueVectorClass(
           minorType, majorType.getMode());
       ValueVector vector = output.addField(field, clazz);
       vector.allocateNew();
-      vectorBuilder.add(vector);
-      copierBuilder.add(getCopier(kuduType, i, vector));
+      
+      ProjectedColumnInfo pci = new ProjectedColumnInfo();
+      pci.vv = vector;
+      pci.kuduColumn = col;
+      pci.index = i;
+      pciBuilder.add(pci);
     }
 
-    vectors = vectorBuilder.build();
-    copiers = copierBuilder.build();
+    projectedCols = pciBuilder.build();
   }
-
+  
   private void addRowResult(RowResult result, int rowIndex) throws SchemaChangeException
{
-    if (copiers == null) {
-      initCopiers(result.getColumnProjection());
+    if (projectedCols == null) {
+      initCols(result.getColumnProjection());
     }
     
-    for (Copier<?> c : copiers) {
-      c.copy(result, rowIndex);
-    }
-  }
-
-
-  @Override
-  public void close() {
-  }
-  
-
-  private Copier<?> getCopier(Type kuduType, int offset, ValueVector v) {
-
-    if (v instanceof NullableBigIntVector) {
-      return new BigIntCopier(offset, (NullableBigIntVector.Mutator) v.getMutator());
-    } else if (v instanceof NullableFloat4Vector) {
-      return new Float4Copier(offset, (NullableFloat4Vector.Mutator) v.getMutator());
-    } else if (v instanceof NullableFloat8Vector) {
-      return new Float8Copier(offset, (NullableFloat8Vector.Mutator) v.getMutator());
-    } else if (v instanceof NullableIntVector) {
-      return new IntCopier(offset, (NullableIntVector.Mutator) v.getMutator());
-    } else if (v instanceof NullableVarCharVector) {
-      return new VarCharCopier(offset, (NullableVarCharVector.Mutator) v.getMutator());
-    } else if (v instanceof NullableVarBinaryVector) {
-      return new VarBinaryCopier(offset, (NullableVarBinaryVector.Mutator) v.getMutator());
-    } else if (v instanceof NullableTimeStampVector) {
-      return new TimeStampCopier(offset, (NullableTimeStampVector.Mutator) v.getMutator());
-    } else if (v instanceof NullableBitVector) {
-      return new BitCopier(offset, (NullableBitVector.Mutator) v.getMutator());
-    }
-
-    throw new IllegalArgumentException("Unknown how to handle vector.");
-  }
-  
-  private abstract class Copier<T extends ValueVector.Mutator> {
-    protected final int columnIndex;
-    protected final T mutator;
-
-    public Copier(int columnIndex, T mutator) {
-      this.columnIndex = columnIndex;
-      this.mutator = mutator;
-    }
-
-    abstract void copy(RowResult result, int index);
-  }
-
-  private class IntCopier extends Copier<NullableIntVector.Mutator> {
-    public IntCopier(int offset, NullableIntVector.Mutator mutator) {
-      super(offset, mutator);
-    }
-
-    @Override
-    void copy(RowResult result, int index) {
-      if (result.isNull(columnIndex)) {
-        mutator.setNull(index);
-      } else {
-        mutator.setSafe(index, result.getInt(columnIndex));
-      }
-    }
-  }
-
-  private class BigIntCopier extends Copier<NullableBigIntVector.Mutator> {
-    public BigIntCopier(int offset, NullableBigIntVector.Mutator mutator) {
-      super(offset, mutator);
-    }
-
-    @Override
-    void copy(RowResult result, int index) {
-      if (result.isNull(columnIndex)) {
-        mutator.setNull(index);
-      } else {
-        mutator.setSafe(index, result.getLong(columnIndex));
+    for (ProjectedColumnInfo pci : projectedCols) {
+      if (result.isNull(pci.index)) {
+        continue;
       }
-    }
-  }
-
-  private class Float4Copier extends Copier<NullableFloat4Vector.Mutator> {
-
-    public Float4Copier(int columnIndex, NullableFloat4Vector.Mutator mutator) {
-      super(columnIndex, mutator);
-    }
-
-    @Override
-    void copy(RowResult result, int index) {
-      if (result.isNull(columnIndex)) {
-        mutator.setNull(index);
-      } else {
-        mutator.setSafe(index, result.getFloat(columnIndex));
+      switch (pci.kuduColumn.getType()) {
+      case BINARY:
+      {
+        ByteBuffer value = result.getBinary(pci.index);
+        if (pci.kuduColumn.isNullable()) {
+          ((NullableVarBinaryVector.Mutator)pci.vv.getMutator())
+            .setSafe(rowIndex, value, 0, value.remaining());
+        } else {
+          ((VarBinaryVector.Mutator)pci.vv.getMutator())
+            .setSafe(rowIndex, value, 0, value.remaining());          
+        }
+        break;
       }
-    }
-
-  }
-
-
-  private class Float8Copier extends Copier<NullableFloat8Vector.Mutator> {
-
-    public Float8Copier(int columnIndex, NullableFloat8Vector.Mutator mutator) {
-      super(columnIndex, mutator);
-    }
-
-    @Override
-    void copy(RowResult result, int index) {
-      if (result.isNull(columnIndex)) {
-        mutator.setNull(index);
-      } else {
-        mutator.setSafe(index, result.getDouble(columnIndex));
+      case STRING:
+      {
+        ByteBuffer value = result.getBinary(pci.index);
+        if (pci.kuduColumn.isNullable()) {
+          ((NullableVarCharVector.Mutator)pci.vv.getMutator())
+            .setSafe(rowIndex, value, 0, value.remaining());
+        } else {
+          ((VarCharVector.Mutator)pci.vv.getMutator())
+            .setSafe(rowIndex, value, 0, value.remaining());          
+        }
+        break;
       }
-    }
-  }
-
-  // TODO: decimal copier
-  
-  private class VarCharCopier extends Copier<NullableVarCharVector.Mutator> {
-
-    public VarCharCopier(int columnIndex, NullableVarCharVector.Mutator mutator) {
-      super(columnIndex, mutator);
-    }
-
-    @Override
-    void copy(RowResult result, int index) {
-      if (result.isNull(columnIndex)) {
-        mutator.setNull(index);
-      } else {
-        ByteBuffer value = result.getBinary(columnIndex);
-        mutator.setSafe(index, value, 0, value.remaining());
+      case BOOL:
+        if (pci.kuduColumn.isNullable()) {
+          ((NullableBitVector.Mutator)pci.vv.getMutator())
+            .setSafe(rowIndex, result.getBoolean(pci.index) ? 1 : 0);
+        } else {
+          ((BitVector.Mutator)pci.vv.getMutator())
+            .setSafe(rowIndex, result.getBoolean(pci.index) ? 1 : 0);
+        }
+        break;
+      case DOUBLE:
+        if (pci.kuduColumn.isNullable()) {
+          ((NullableFloat8Vector.Mutator)pci.vv.getMutator())
+            .setSafe(rowIndex, result.getDouble(pci.index));
+        } else {
+          ((Float8Vector.Mutator)pci.vv.getMutator())
+            .setSafe(rowIndex, result.getDouble(pci.index));
+        }
+        break;
+      case FLOAT:
+        if (pci.kuduColumn.isNullable()) {
+          ((NullableFloat4Vector.Mutator)pci.vv.getMutator())
+            .setSafe(rowIndex, result.getFloat(pci.index));
+        } else {
+          ((Float4Vector.Mutator)pci.vv.getMutator())
+            .setSafe(rowIndex, result.getFloat(pci.index));
+        }
+        break;
+      case INT16:
+        if (pci.kuduColumn.isNullable()) {
+          ((NullableIntVector.Mutator)pci.vv.getMutator())
+            .setSafe(rowIndex, result.getShort(pci.index));
+        } else {
+          ((IntVector.Mutator)pci.vv.getMutator())
+            .setSafe(rowIndex, result.getShort(pci.index));
+        }
+        break;
+      case INT32:
+        if (pci.kuduColumn.isNullable()) {
+          ((NullableIntVector.Mutator)pci.vv.getMutator())
+            .setSafe(rowIndex, result.getInt(pci.index));
+        } else {
+          ((IntVector.Mutator)pci.vv.getMutator())
+            .setSafe(rowIndex, result.getInt(pci.index));
+        }
+        break;
+      case INT8:
+        if (pci.kuduColumn.isNullable()) {
+          ((NullableIntVector.Mutator)pci.vv.getMutator())
+            .setSafe(rowIndex, result.getByte(pci.index));
+        } else {
+          ((IntVector.Mutator)pci.vv.getMutator())
+            .setSafe(rowIndex, result.getByte(pci.index));
+        }
+        break;
+      case INT64:
+        if (pci.kuduColumn.isNullable()) {
+          ((NullableBigIntVector.Mutator)pci.vv.getMutator())
+            .setSafe(rowIndex, result.getLong(pci.index));
+        } else {
+          ((BigIntVector.Mutator)pci.vv.getMutator())
+            .setSafe(rowIndex, result.getLong(pci.index));
+        }
+        break;
+      case TIMESTAMP:
+        if (pci.kuduColumn.isNullable()) {
+          ((NullableTimeStampVector.Mutator)pci.vv.getMutator())
+            .setSafe(rowIndex, result.getLong(pci.index));
+        } else {
+          ((TimeStampVector.Mutator)pci.vv.getMutator())
+            .setSafe(rowIndex, result.getLong(pci.index));
+        }
+        break;
+      default:
+        throw new SchemaChangeException("unknown type"); // TODO make better
       }
     }
   }
 
-  private class VarBinaryCopier extends Copier<NullableVarBinaryVector.Mutator> {
-
-    public VarBinaryCopier(int columnIndex, NullableVarBinaryVector.Mutator mutator) {
-      super(columnIndex, mutator);
-    }
 
-    @Override
-    void copy(RowResult result, int index) {
-      if (result.isNull(columnIndex)) {
-        mutator.setNull(index);
-      } else {
-        ByteBuffer value = result.getBinary(columnIndex);
-        mutator.setSafe(index, value, 0, value.remaining());
-      }
-    }
+  @Override
+  public void close() {
   }
-
-  // TODO: DateCopier
-  // TODO: TimeCopier
   
-  private class TimeStampCopier extends Copier<NullableTimeStampVector.Mutator> {
-
-    public TimeStampCopier(int columnIndex, NullableTimeStampVector.Mutator mutator) {
-      super(columnIndex, mutator);
-    }
-
-    @Override
-    void copy(RowResult result, int index) {
-      if (result.isNull(columnIndex)) {
-        mutator.setNull(index);
-      } else {
-        long ts = result.getLong(columnIndex);
-        mutator.setSafe(index, ts / 1000);
-      }
-    }
-  }
-
-  private class BitCopier extends Copier<NullableBitVector.Mutator> {
-    public BitCopier(int columnIndex, NullableBitVector.Mutator mutator) {
-      super(columnIndex, mutator);
-    }
-
-    @Override
-    void copy(RowResult result, int index) {
-      if (result.isNull(columnIndex)) {
-        mutator.setNull(index);
-      } else {
-        mutator.setSafe(index, result.getBoolean(columnIndex) ? 1 : 0);
-      }
-    }
-  }
-
 }


Mime
View raw message