drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [06/45] drill git commit: DRILL-3987: (MOVE) Extract key vector, field reader, complex/field writer classes.
Date Fri, 13 Nov 2015 02:37:36 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/9969d8bd/exec/vector/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
new file mode 100644
index 0000000..fe91fe6
--- /dev/null
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -0,0 +1,636 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.NullableVectorDefinitionSetter;
+
+import java.lang.Override;
+import java.lang.UnsupportedOperationException;
+
+<@pp.dropOutputFile />
+<#list vv.types as type>
+<#list type.minor as minor>
+
+<#assign className = "Nullable${minor.class}Vector" />
+<#assign valuesName = "${minor.class}Vector" />
+<#assign friendlyType = (minor.friendlyType!minor.boxedType!type.boxedType) />
+
+<@pp.changeOutputFile name="/org/apache/drill/exec/vector/${className}.java" />
+
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.vector;
+
+<#include "/@includes/vv_imports.ftl" />
+
+/**
+ * Nullable${minor.class} implements a vector of values which could be null.  Elements in the vector
+ * are first checked against a fixed length vector of boolean values.  Then the element is retrieved
+ * from the base class (if not null).
+ *
+ * NB: this class is automatically generated from ${.template_name} and ValueVectorTypes.tdd using FreeMarker.
+ */
+@SuppressWarnings("unused")
+public final class ${className} extends BaseDataValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, NullableVector{
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${className}.class);
+
+  private final FieldReader reader = new Nullable${minor.class}ReaderImpl(Nullable${minor.class}Vector.this);
+
+  private final MaterializedField bitsField = MaterializedField.create("$bits$", Types.required(MinorType.UINT1));
+  private final UInt1Vector bits = new UInt1Vector(bitsField, allocator);
+  private final ${valuesName} values = new ${minor.class}Vector(field, allocator);
+
+  private final Mutator mutator = new Mutator();
+  private final Accessor accessor = new Accessor();
+
+  public ${className}(MaterializedField field, BufferAllocator allocator) {
+    super(field, allocator);
+  }
+
+  @Override
+  public FieldReader getReader(){
+    return reader;
+  }
+
+  @Override
+  public int getValueCapacity(){
+    return Math.min(bits.getValueCapacity(), values.getValueCapacity());
+  }
+
+  @Override
+  public DrillBuf[] getBuffers(boolean clear) {
+    final DrillBuf[] buffers = ObjectArrays.concat(bits.getBuffers(false), values.getBuffers(false), DrillBuf.class);
+    if (clear) {
+      for (final DrillBuf buffer:buffers) {
+        buffer.retain(1);
+      }
+      clear();
+    }
+    return buffers;
+  }
+
+  @Override
+  public void close() {
+    bits.close();
+    values.close();
+    super.close();
+  }
+
+  @Override
+  public void clear() {
+    bits.clear();
+    values.clear();
+    super.clear();
+  }
+
+  @Override
+  public int getBufferSize(){
+    return values.getBufferSize() + bits.getBufferSize();
+  }
+
+  @Override
+  public int getBufferSizeFor(final int valueCount) {
+    if (valueCount == 0) {
+      return 0;
+    }
+
+    return values.getBufferSizeFor(valueCount)
+        + bits.getBufferSizeFor(valueCount);
+  }
+
+  @Override
+  public DrillBuf getBuffer() {
+    return values.getBuffer();
+  }
+
+  @Override
+  public ${valuesName} getValuesVector() {
+    return values;
+  }
+
+  @Override
+  public void setInitialCapacity(int numRecords) {
+    bits.setInitialCapacity(numRecords);
+    values.setInitialCapacity(numRecords);
+  }
+
+  @Override
+  public SerializedField.Builder getMetadataBuilder() {
+    return super.getMetadataBuilder()
+      .addChild(bits.getMetadata())
+      .addChild(values.getMetadata());
+  }
+
+  @Override
+  public void allocateNew() {
+    if(!allocateNewSafe()){
+      throw new OutOfMemoryRuntimeException("Failure while allocating buffer.");
+    }
+  }
+
+  @Override
+  public boolean allocateNewSafe() {
+    /* Boolean to keep track if all the memory allocations were successful
+     * Used in the case of composite vectors when we need to allocate multiple
+     * buffers for multiple vectors. If one of the allocations failed we need to
+     * clear all the memory that we allocated
+     */
+    boolean success = false;
+    try {
+      success = values.allocateNewSafe() && bits.allocateNewSafe();
+    } finally {
+      if (!success) {
+        clear();
+      }
+    }
+    bits.zeroVector();
+    mutator.reset();
+    accessor.reset();
+    return success;
+  }
+
+  <#if type.major == "VarLen">
+  @Override
+  public void allocateNew(int totalBytes, int valueCount) {
+    try {
+      values.allocateNew(totalBytes, valueCount);
+      bits.allocateNew(valueCount);
+    } catch(DrillRuntimeException e) {
+      clear();
+      throw e;
+    }
+    bits.zeroVector();
+    mutator.reset();
+    accessor.reset();
+  }
+
+  public void reset() {
+    bits.zeroVector();
+    mutator.reset();
+    accessor.reset();
+    super.reset();
+  }
+
+  @Override
+  public int getByteCapacity(){
+    return values.getByteCapacity();
+  }
+
+  @Override
+  public int getCurrentSizeInBytes(){
+    return values.getCurrentSizeInBytes();
+  }
+
+  <#else>
+  @Override
+  public void allocateNew(int valueCount) {
+    try {
+      values.allocateNew(valueCount);
+      bits.allocateNew(valueCount+1);
+    } catch(OutOfMemoryRuntimeException e) {
+      clear();
+      throw e;
+    }
+    bits.zeroVector();
+    mutator.reset();
+    accessor.reset();
+  }
+
+  @Override
+  public void reset() {
+    bits.zeroVector();
+    mutator.reset();
+    accessor.reset();
+    super.reset();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void zeroVector() {
+    bits.zeroVector();
+    values.zeroVector();
+  }
+  </#if>
+
+
+  @Override
+  public void load(SerializedField metadata, DrillBuf buffer) {
+    clear();
+    // the bits vector is the first child (the order in which the children are added in getMetadataBuilder is significant)
+    final SerializedField bitsField = metadata.getChild(0);
+    bits.load(bitsField, buffer);
+
+    final int capacity = buffer.capacity();
+    final int bitsLength = bitsField.getBufferLength();
+    final SerializedField valuesField = metadata.getChild(1);
+    values.load(valuesField, buffer.slice(bitsLength, capacity - bitsLength));
+  }
+
+  @Override
+  public TransferPair getTransferPair(){
+    return new TransferImpl(getField());
+  }
+
+  @Override
+  public TransferPair getTransferPair(FieldReference ref){
+    return new TransferImpl(getField().withPath(ref));
+  }
+
+  @Override
+  public TransferPair makeTransferPair(ValueVector to) {
+    return new TransferImpl((Nullable${minor.class}Vector) to);
+  }
+
+  public void transferTo(Nullable${minor.class}Vector target){
+    bits.transferTo(target.bits);
+    values.transferTo(target.values);
+    <#if type.major == "VarLen">
+    target.mutator.lastSet = mutator.lastSet;
+    </#if>
+    clear();
+  }
+
+  public void splitAndTransferTo(int startIndex, int length, Nullable${minor.class}Vector target) {
+    bits.splitAndTransferTo(startIndex, length, target.bits);
+    values.splitAndTransferTo(startIndex, length, target.values);
+    <#if type.major == "VarLen">
+    target.mutator.lastSet = length - 1;
+    </#if>
+  }
+
+  private class TransferImpl implements TransferPair {
+    Nullable${minor.class}Vector to;
+
+    public TransferImpl(MaterializedField field){
+      to = new Nullable${minor.class}Vector(field, allocator);
+    }
+
+    public TransferImpl(Nullable${minor.class}Vector to){
+      this.to = to;
+    }
+
+    @Override
+    public Nullable${minor.class}Vector getTo(){
+      return to;
+    }
+
+    @Override
+    public void transfer(){
+      transferTo(to);
+    }
+
+    @Override
+    public void splitAndTransfer(int startIndex, int length) {
+      splitAndTransferTo(startIndex, length, to);
+    }
+
+    @Override
+    public void copyValueSafe(int fromIndex, int toIndex) {
+      to.copyFromSafe(fromIndex, toIndex, Nullable${minor.class}Vector.this);
+    }
+  }
+
+  @Override
+  public Accessor getAccessor(){
+    return accessor;
+  }
+
+  @Override
+  public Mutator getMutator(){
+    return mutator;
+  }
+
+  public ${minor.class}Vector convertToRequiredVector(){
+    ${minor.class}Vector v = new ${minor.class}Vector(getField().getOtherNullableVersion(), allocator);
+    if (v.data != null) {
+      v.data.release(1);
+    }
+    v.data = values.data;
+    v.data.retain(1);
+    clear();
+    return v;
+  }
+
+  public void copyFrom(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
+    final Accessor fromAccessor = from.getAccessor();
+    if (!fromAccessor.isNull(fromIndex)) {
+      mutator.set(thisIndex, fromAccessor.get(fromIndex));
+    }
+  }
+
+  public void copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from){
+    <#if type.major == "VarLen">
+    mutator.fillEmpties(thisIndex);
+    </#if>
+    values.copyFromSafe(fromIndex, thisIndex, from);
+    bits.getMutator().setSafe(thisIndex, 1);
+  }
+
+  public void copyFromSafe(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
+    <#if type.major == "VarLen">
+    mutator.fillEmpties(thisIndex);
+    </#if>
+    bits.copyFromSafe(fromIndex, thisIndex, from.bits);
+    values.copyFromSafe(fromIndex, thisIndex, from.values);
+  }
+
+  public final class Accessor extends BaseDataValueVector.BaseAccessor <#if type.major = "VarLen">implements VariableWidthVector.VariableWidthAccessor</#if> {
+    final UInt1Vector.Accessor bAccessor = bits.getAccessor();
+    final ${valuesName}.Accessor vAccessor = values.getAccessor();
+
+    /**
+     * Get the element at the specified position.
+     *
+     * @param   index   position of the value
+     * @return  value of the element, if not null
+     * @throws  NullValueException if the value is null
+     */
+    public <#if type.major == "VarLen">byte[]<#else>${minor.javaType!type.javaType}</#if> get(int index) {
+      if (isNull(index)) {
+          throw new IllegalStateException("Can't get a null value");
+      }
+      return vAccessor.get(index);
+    }
+
+    @Override
+    public boolean isNull(int index) {
+      return isSet(index) == 0;
+    }
+
+    public int isSet(int index){
+      return bAccessor.get(index);
+    }
+
+    <#if type.major == "VarLen">
+    public long getStartEnd(int index){
+      return vAccessor.getStartEnd(index);
+    }
+
+    @Override
+    public int getValueLength(int index) {
+      return values.getAccessor().getValueLength(index);
+    }
+    </#if>
+
+    public void get(int index, Nullable${minor.class}Holder holder){
+      vAccessor.get(index, holder);
+      holder.isSet = bAccessor.get(index);
+
+      <#if minor.class.startsWith("Decimal")>
+      holder.scale = getField().getScale();
+      holder.precision = getField().getPrecision();
+      </#if>
+    }
+
+    @Override
+    public ${friendlyType} getObject(int index) {
+      if (isNull(index)) {
+          return null;
+      }else{
+        return vAccessor.getObject(index);
+      }
+    }
+
+    <#if minor.class == "Interval" || minor.class == "IntervalDay" || minor.class == "IntervalYear">
+    public StringBuilder getAsStringBuilder(int index) {
+      if (isNull(index)) {
+          return null;
+      }else{
+        return vAccessor.getAsStringBuilder(index);
+      }
+    }
+    </#if>
+
+    @Override
+    public int getValueCount(){
+      return bits.getAccessor().getValueCount();
+    }
+
+    public void reset(){}
+  }
+
+  public final class Mutator extends BaseDataValueVector.BaseMutator implements NullableVectorDefinitionSetter<#if type.major = "VarLen">, VariableWidthVector.VariableWidthMutator</#if> {
+    private int setCount;
+    <#if type.major = "VarLen"> private int lastSet = -1;</#if>
+
+    private Mutator(){
+    }
+
+    public ${valuesName} getVectorWithValues(){
+      return values;
+    }
+
+    @Override
+    public void setIndexDefined(int index){
+      bits.getMutator().set(index, 1);
+    }
+
+    /**
+     * Set the variable length element at the specified index to the supplied byte array.
+     *
+     * @param index   position of the bit to set
+     * @param bytes   array of bytes to write
+     */
+    public void set(int index, <#if type.major == "VarLen">byte[]<#elseif (type.width < 4)>int<#else>${minor.javaType!type.javaType}</#if> value) {
+      setCount++;
+      final ${valuesName}.Mutator valuesMutator = values.getMutator();
+      final UInt1Vector.Mutator bitsMutator = bits.getMutator();
+      <#if type.major == "VarLen">
+      for (int i = lastSet + 1; i < index; i++) {
+        valuesMutator.set(i, emptyByteArray);
+      }
+      </#if>
+      bitsMutator.set(index, 1);
+      valuesMutator.set(index, value);
+      <#if type.major == "VarLen">lastSet = index;</#if>
+    }
+
+    <#if type.major == "VarLen">
+
+    private void fillEmpties(int index){
+      final ${valuesName}.Mutator valuesMutator = values.getMutator();
+      for (int i = lastSet; i < index; i++) {
+        valuesMutator.setSafe(i + 1, emptyByteArray);
+      }
+      while(index > bits.getValueCapacity()) {
+        bits.reAlloc();
+      }
+      lastSet = index;
+    }
+
+    @Override
+    public void setValueLengthSafe(int index, int length) {
+      values.getMutator().setValueLengthSafe(index, length);
+      lastSet = index;
+    }
+    </#if>
+
+    public void setSafe(int index, byte[] value, int start, int length) {
+      <#if type.major != "VarLen">
+      throw new UnsupportedOperationException();
+      <#else>
+      fillEmpties(index);
+
+      bits.getMutator().setSafe(index, 1);
+      values.getMutator().setSafe(index, value, start, length);
+      setCount++;
+      <#if type.major == "VarLen">lastSet = index;</#if>
+      </#if>
+    }
+
+    public void setSafe(int index, ByteBuffer value, int start, int length) {
+      <#if type.major != "VarLen">
+      throw new UnsupportedOperationException();
+      <#else>
+      fillEmpties(index);
+
+      bits.getMutator().setSafe(index, 1);
+      values.getMutator().setSafe(index, value, start, length);
+      setCount++;
+      <#if type.major == "VarLen">lastSet = index;</#if>
+      </#if>
+    }
+
+    public void setNull(int index){
+      bits.getMutator().setSafe(index, 0);
+    }
+
+    public void setSkipNull(int index, ${minor.class}Holder holder){
+      values.getMutator().set(index, holder);
+    }
+
+    public void setSkipNull(int index, Nullable${minor.class}Holder holder){
+      values.getMutator().set(index, holder);
+    }
+
+
+    public void set(int index, Nullable${minor.class}Holder holder){
+      final ${valuesName}.Mutator valuesMutator = values.getMutator();
+      <#if type.major == "VarLen">
+      for (int i = lastSet + 1; i < index; i++) {
+        valuesMutator.set(i, emptyByteArray);
+      }
+      </#if>
+      bits.getMutator().set(index, holder.isSet);
+      valuesMutator.set(index, holder);
+      <#if type.major == "VarLen">lastSet = index;</#if>
+    }
+
+    public void set(int index, ${minor.class}Holder holder){
+      final ${valuesName}.Mutator valuesMutator = values.getMutator();
+      <#if type.major == "VarLen">
+      for (int i = lastSet + 1; i < index; i++) {
+        valuesMutator.set(i, emptyByteArray);
+      }
+      </#if>
+      bits.getMutator().set(index, 1);
+      valuesMutator.set(index, holder);
+      <#if type.major == "VarLen">lastSet = index;</#if>
+    }
+
+    public boolean isSafe(int outIndex) {
+      return outIndex < Nullable${minor.class}Vector.this.getValueCapacity();
+    }
+
+    <#assign fields = minor.fields!type.fields />
+    public void set(int index, int isSet<#list fields as field><#if field.include!true >, ${field.type} ${field.name}Field</#if></#list> ){
+      final ${valuesName}.Mutator valuesMutator = values.getMutator();
+      <#if type.major == "VarLen">
+      for (int i = lastSet + 1; i < index; i++) {
+        valuesMutator.set(i, emptyByteArray);
+      }
+      </#if>
+      bits.getMutator().set(index, isSet);
+      valuesMutator.set(index<#list fields as field><#if field.include!true >, ${field.name}Field</#if></#list>);
+      <#if type.major == "VarLen">lastSet = index;</#if>
+    }
+
+    public void setSafe(int index, int isSet<#list fields as field><#if field.include!true >, ${field.type} ${field.name}Field</#if></#list> ) {
+      <#if type.major == "VarLen">
+      fillEmpties(index);
+      </#if>
+
+      bits.getMutator().setSafe(index, isSet);
+      values.getMutator().setSafe(index<#list fields as field><#if field.include!true >, ${field.name}Field</#if></#list>);
+      setCount++;
+      <#if type.major == "VarLen">lastSet = index;</#if>
+    }
+
+
+    public void setSafe(int index, Nullable${minor.class}Holder value) {
+
+      <#if type.major == "VarLen">
+      fillEmpties(index);
+      </#if>
+      bits.getMutator().setSafe(index, value.isSet);
+      values.getMutator().setSafe(index, value);
+      setCount++;
+      <#if type.major == "VarLen">lastSet = index;</#if>
+    }
+
+    public void setSafe(int index, ${minor.class}Holder value) {
+
+      <#if type.major == "VarLen">
+      fillEmpties(index);
+      </#if>
+      bits.getMutator().setSafe(index, 1);
+      values.getMutator().setSafe(index, value);
+      setCount++;
+      <#if type.major == "VarLen">lastSet = index;</#if>
+    }
+
+    <#if !(type.major == "VarLen" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense" || minor.class == "Interval" || minor.class == "IntervalDay")>
+      public void setSafe(int index, ${minor.javaType!type.javaType} value) {
+        <#if type.major == "VarLen">
+        fillEmpties(index);
+        </#if>
+        bits.getMutator().setSafe(index, 1);
+        values.getMutator().setSafe(index, value);
+        setCount++;
+      }
+
+    </#if>
+
+    @Override
+    public void setValueCount(int valueCount) {
+      assert valueCount >= 0;
+      <#if type.major == "VarLen">
+      fillEmpties(valueCount);
+      </#if>
+      values.getMutator().setValueCount(valueCount);
+      bits.getMutator().setValueCount(valueCount);
+    }
+
+    @Override
+    public void generateTestData(int valueCount){
+      bits.getMutator().generateTestDataAlt(valueCount);
+      values.getMutator().generateTestData(valueCount);
+      <#if type.major = "VarLen">lastSet = valueCount;</#if>
+      setValueCount(valueCount);
+    }
+
+    @Override
+    public void reset(){
+      setCount = 0;
+      <#if type.major = "VarLen">lastSet = -1;</#if>
+    }
+  }
+}
+</#list>
+</#list>

http://git-wip-us.apache.org/repos/asf/drill/blob/9969d8bd/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java b/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
new file mode 100644
index 0000000..2f5ec18
--- /dev/null
+++ b/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
@@ -0,0 +1,427 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.lang.Override;
+
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector;
+import org.mortbay.jetty.servlet.Holder;
+
+<@pp.dropOutputFile />
+<#list vv.types as type>
+<#list type.minor as minor>
+<#assign friendlyType = (minor.friendlyType!minor.boxedType!type.boxedType) />
+<#assign fields = minor.fields!type.fields />
+
+<@pp.changeOutputFile name="/org/apache/drill/exec/vector/Repeated${minor.class}Vector.java" />
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.vector;
+
+<#include "/@includes/vv_imports.ftl" />
+
+/**
+ * Repeated${minor.class} implements a vector with multple values per row (e.g. JSON array or
+ * repeated protobuf field).  The implementation uses two additional value vectors; one to convert
+ * the index offset to the underlying element offset, and another to store the number of values
+ * in the vector.
+ *
+ * NB: this class is automatically generated from ${.template_name} and ValueVectorTypes.tdd using FreeMarker.
+ */
+
+public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector implements Repeated<#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>VectorLike {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Repeated${minor.class}Vector.class);
+
+  // we maintain local reference to concrete vector type for performance reasons.
+  private ${minor.class}Vector values;
+  private final FieldReader reader = new Repeated${minor.class}ReaderImpl(Repeated${minor.class}Vector.this);
+  private final Mutator mutator = new Mutator();
+  private final Accessor accessor = new Accessor();
+
+  public Repeated${minor.class}Vector(MaterializedField field, BufferAllocator allocator) {
+    super(field, allocator);
+    addOrGetVector(VectorDescriptor.create(Types.required(field.getType().getMinorType())));
+  }
+
+  @Override
+  public Mutator getMutator() {
+    return mutator;
+  }
+
+  @Override
+  public Accessor getAccessor() {
+    return accessor;
+  }
+
+  @Override
+  public FieldReader getReader() {
+    return reader;
+  }
+
+  @Override
+  public ${minor.class}Vector getDataVector() {
+    return values;
+  }
+
+  @Override
+  public TransferPair getTransferPair() {
+    return new TransferImpl(getField());
+  }
+
+  @Override
+  public TransferPair getTransferPair(FieldReference ref){
+    return new TransferImpl(getField().withPath(ref));
+  }
+
+  @Override
+  public TransferPair makeTransferPair(ValueVector to) {
+    return new TransferImpl((Repeated${minor.class}Vector) to);
+  }
+
+  @Override
+  public AddOrGetResult<${minor.class}Vector> addOrGetVector(VectorDescriptor descriptor) {
+    final AddOrGetResult<${minor.class}Vector> result = super.addOrGetVector(descriptor);
+    if (result.isCreated()) {
+      values = result.getVector();
+    }
+    return result;
+  }
+
+  public void transferTo(Repeated${minor.class}Vector target) {
+    target.clear();
+    offsets.transferTo(target.offsets);
+    values.transferTo(target.values);
+    clear();
+  }
+
+  public void splitAndTransferTo(final int startIndex, final int groups, Repeated${minor.class}Vector to) {
+    final UInt4Vector.Accessor a = offsets.getAccessor();
+    final UInt4Vector.Mutator m = to.offsets.getMutator();
+
+    final int startPos = a.get(startIndex);
+    final int endPos = a.get(startIndex + groups);
+    final int valuesToCopy = endPos - startPos;
+
+    values.splitAndTransferTo(startPos, valuesToCopy, to.values);
+    to.offsets.clear();
+    to.offsets.allocateNew(groups + 1);
+    int normalizedPos = 0;
+    for (int i=0; i < groups + 1;i++ ) {
+      normalizedPos = a.get(startIndex+i) - startPos;
+      m.set(i, normalizedPos);
+    }
+    m.setValueCount(groups == 0 ? 0 : groups + 1);
+  }
+
+  private class TransferImpl implements TransferPair {
+    final Repeated${minor.class}Vector to;
+
+    public TransferImpl(MaterializedField field) {
+      this.to = new Repeated${minor.class}Vector(field, allocator);
+    }
+
+    public TransferImpl(Repeated${minor.class}Vector to) {
+      this.to = to;
+    }
+
+    @Override
+    public Repeated${minor.class}Vector getTo() {
+      return to;
+    }
+
+    @Override
+    public void transfer() {
+      transferTo(to);
+    }
+
+    @Override
+    public void splitAndTransfer(int startIndex, int length) {
+      splitAndTransferTo(startIndex, length, to);
+    }
+
+    @Override
+    public void copyValueSafe(int fromIndex, int toIndex) {
+      to.copyFromSafe(fromIndex, toIndex, Repeated${minor.class}Vector.this);
+    }
+  }
+
+    public void copyFrom(int inIndex, int outIndex, Repeated${minor.class}Vector v) {
+      final Accessor vAccessor = v.getAccessor();
+      final int count = vAccessor.getInnerValueCountAt(inIndex);
+      mutator.startNewValue(outIndex);
+      for (int i = 0; i < count; i++) {
+        mutator.add(outIndex, vAccessor.get(inIndex, i));
+      }
+    }
+
+    public void copyFromSafe(int inIndex, int outIndex, Repeated${minor.class}Vector v) {
+      final Accessor vAccessor = v.getAccessor();
+      final int count = vAccessor.getInnerValueCountAt(inIndex);
+      mutator.startNewValue(outIndex);
+      for (int i = 0; i < count; i++) {
+        mutator.addSafe(outIndex, vAccessor.get(inIndex, i));
+      }
+    }
+
+  public boolean allocateNewSafe() {
+    /* boolean to keep track if all the memory allocation were successful
+     * Used in the case of composite vectors when we need to allocate multiple
+     * buffers for multiple vectors. If one of the allocations failed we need to
+     * clear all the memory that we allocated
+     */
+    boolean success = false;
+    try {
+      if(!offsets.allocateNewSafe()) return false;
+      if(!values.allocateNewSafe()) return false;
+      success = true;
+    } finally {
+      if (!success) {
+        clear();
+      }
+    }
+    offsets.zeroVector();
+    mutator.reset();
+    return true;
+  }
+
+  @Override
+  public void allocateNew() {
+    try {
+      offsets.allocateNew();
+      values.allocateNew();
+    } catch (OutOfMemoryRuntimeException e) {
+      clear();
+      throw e;
+    }
+    offsets.zeroVector();
+    mutator.reset();
+  }
+
+  <#if type.major == "VarLen">
+  @Override
+  protected SerializedField.Builder getMetadataBuilder() {
+    return super.getMetadataBuilder()
+            .setVarByteLength(values.getVarByteLength());
+  }
+
+  public void allocateNew(int totalBytes, int valueCount, int innerValueCount) {
+    try {
+      offsets.allocateNew(valueCount + 1);
+      values.allocateNew(totalBytes, innerValueCount);
+    } catch (OutOfMemoryRuntimeException e) {
+      clear();
+      throw e;
+    }
+    offsets.zeroVector();
+    mutator.reset();
+  }
+
+  public int getByteCapacity(){
+    return values.getByteCapacity();
+  }
+
+  <#else>
+
+  @Override
+  public void allocateNew(int valueCount, int innerValueCount) {
+    clear();
+    /* boolean to keep track if all the memory allocation were successful
+     * Used in the case of composite vectors when we need to allocate multiple
+     * buffers for multiple vectors. If one of the allocations failed we need to//
+     * clear all the memory that we allocated
+     */
+    boolean success = false;
+    try {
+      offsets.allocateNew(valueCount + 1);
+      values.allocateNew(innerValueCount);
+    } catch(OutOfMemoryRuntimeException e){
+      clear();
+      throw e;
+    }
+    offsets.zeroVector();
+    mutator.reset();
+  }
+
+  </#if>
+
+  // This is declared a subclass of the accessor declared inside of FixedWidthVector, this is also used for
+  // variable length vectors, as they should ahve consistent interface as much as possible, if they need to diverge
+  // in the future, the interface shold be declared in the respective value vector superclasses for fixed and variable
+  // and we should refer to each in the generation template
+  public final class Accessor extends BaseRepeatedValueVector.BaseRepeatedAccessor {
+    @Override
+    public List<${friendlyType}> getObject(int index) {
+      final List<${friendlyType}> vals = new JsonStringArrayList<>();
+      final UInt4Vector.Accessor offsetsAccessor = offsets.getAccessor();
+      final int start = offsetsAccessor.get(index);
+      final int end = offsetsAccessor.get(index + 1);
+      final ${minor.class}Vector.Accessor valuesAccessor = values.getAccessor();
+      for(int i = start; i < end; i++) {
+        vals.add(valuesAccessor.getObject(i));
+      }
+      return vals;
+    }
+
+    public ${friendlyType} getSingleObject(int index, int arrayIndex) {
+      final int start = offsets.getAccessor().get(index);
+      return values.getAccessor().getObject(start + arrayIndex);
+    }
+
+    /**
+     * Get a value for the given record.  Each element in the repeated field is accessed by
+     * the positionIndex param.
+     *
+     * @param  index           record containing the repeated field
+     * @param  positionIndex   position within the repeated field
+     * @return element at the given position in the given record
+     */
+    public <#if type.major == "VarLen">byte[]
+           <#else>${minor.javaType!type.javaType}
+           </#if> get(int index, int positionIndex) {
+      return values.getAccessor().get(offsets.getAccessor().get(index) + positionIndex);
+    }
+
+    public void get(int index, Repeated${minor.class}Holder holder) {
+      holder.start = offsets.getAccessor().get(index);
+      holder.end =  offsets.getAccessor().get(index+1);
+      holder.vector = values;
+    }
+
+    public void get(int index, int positionIndex, ${minor.class}Holder holder) {
+      final int offset = offsets.getAccessor().get(index);
+      assert offset >= 0;
+      assert positionIndex < getInnerValueCountAt(index);
+      values.getAccessor().get(offset + positionIndex, holder);
+    }
+
+    public void get(int index, int positionIndex, Nullable${minor.class}Holder holder) {
+      final int offset = offsets.getAccessor().get(index);
+      assert offset >= 0;
+      if (positionIndex >= getInnerValueCountAt(index)) {
+        holder.isSet = 0;
+        return;
+      }
+      values.getAccessor().get(offset + positionIndex, holder);
+    }
+  }
+
+  public final class Mutator extends BaseRepeatedValueVector.BaseRepeatedMutator implements RepeatedMutator {
+    private Mutator() {}
+
+    /**
+     * Add an element to the given record index.  This is similar to the set() method in other
+     * value vectors, except that it permits setting multiple values for a single record.
+     *
+     * @param index   record of the element to add
+     * @param value   value to add to the given row
+     */
+    public void add(int index, <#if type.major == "VarLen">byte[]<#elseif (type.width < 4)>int<#else>${minor.javaType!type.javaType}</#if> value) {
+      int nextOffset = offsets.getAccessor().get(index+1);
+      values.getMutator().set(nextOffset, value);
+      offsets.getMutator().set(index+1, nextOffset+1);
+    }
+
+    <#if type.major == "VarLen">
+    public void addSafe(int index, byte[] bytes) {
+      addSafe(index, bytes, 0, bytes.length);
+    }
+
+    public void addSafe(int index, byte[] bytes, int start, int length) {
+      final int nextOffset = offsets.getAccessor().get(index+1);
+      values.getMutator().setSafe(nextOffset, bytes, start, length);
+      offsets.getMutator().setSafe(index+1, nextOffset+1);
+    }
+
+    <#else>
+
+    public void addSafe(int index, ${minor.javaType!type.javaType} srcValue) {
+      final int nextOffset = offsets.getAccessor().get(index+1);
+      values.getMutator().setSafe(nextOffset, srcValue);
+      offsets.getMutator().setSafe(index+1, nextOffset+1);
+    }
+
+    </#if>
+
+    public void setSafe(int index, Repeated${minor.class}Holder h) {
+      final ${minor.class}Holder ih = new ${minor.class}Holder();
+      final ${minor.class}Vector.Accessor hVectorAccessor = h.vector.getAccessor();
+      mutator.startNewValue(index);
+      for(int i = h.start; i < h.end; i++){
+        hVectorAccessor.get(i, ih);
+        mutator.addSafe(index, ih);
+      }
+    }
+
+    public void addSafe(int index, ${minor.class}Holder holder) {
+      int nextOffset = offsets.getAccessor().get(index+1);
+      values.getMutator().setSafe(nextOffset, holder);
+      offsets.getMutator().setSafe(index+1, nextOffset+1);
+    }
+
+    public void addSafe(int index, Nullable${minor.class}Holder holder) {
+      final int nextOffset = offsets.getAccessor().get(index+1);
+      values.getMutator().setSafe(nextOffset, holder);
+      offsets.getMutator().setSafe(index+1, nextOffset+1);
+    }
+
+    <#if (fields?size > 1) && !(minor.class == "Decimal9" || minor.class == "Decimal18" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense")>
+    public void addSafe(int arrayIndex, <#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>) {
+      int nextOffset = offsets.getAccessor().get(arrayIndex+1);
+      values.getMutator().setSafe(nextOffset, <#list fields as field>${field.name}<#if field_has_next>, </#if></#list>);
+      offsets.getMutator().setSafe(arrayIndex+1, nextOffset+1);
+    }
+    </#if>
+
+    protected void add(int index, ${minor.class}Holder holder) {
+      int nextOffset = offsets.getAccessor().get(index+1);
+      values.getMutator().set(nextOffset, holder);
+      offsets.getMutator().set(index+1, nextOffset+1);
+    }
+
+    public void add(int index, Repeated${minor.class}Holder holder) {
+
+      ${minor.class}Vector.Accessor accessor = holder.vector.getAccessor();
+      ${minor.class}Holder innerHolder = new ${minor.class}Holder();
+
+      for(int i = holder.start; i < holder.end; i++) {
+        accessor.get(i, innerHolder);
+        add(index, innerHolder);
+      }
+    }
+
+    @Override
+    public void generateTestData(final int valCount) {
+      final int[] sizes = {1, 2, 0, 6};
+      int size = 0;
+      int runningOffset = 0;
+      final UInt4Vector.Mutator offsetsMutator = offsets.getMutator();
+      for(int i = 1; i < valCount + 1; i++, size++) {
+        runningOffset += sizes[size % sizes.length];
+        offsetsMutator.set(i, runningOffset);
+      }
+      values.getMutator().generateTestData(valCount * 9);
+      setValueCount(size);
+    }
+
+    @Override
+    public void reset() {
+    }
+  }
+}
+</#list>
+</#list>

http://git-wip-us.apache.org/repos/asf/drill/blob/9969d8bd/exec/vector/src/main/codegen/templates/TypeHelper.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/TypeHelper.java b/exec/vector/src/main/codegen/templates/TypeHelper.java
new file mode 100644
index 0000000..b8178cb
--- /dev/null
+++ b/exec/vector/src/main/codegen/templates/TypeHelper.java
@@ -0,0 +1,600 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.drill.exec.vector.complex.UnionVector;
+
+<@pp.dropOutputFile />
+<@pp.changeOutputFile name="/org/apache/drill/exec/expr/TypeHelper.java" />
+
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.expr;
+
+<#include "/@includes/vv_imports.ftl" />
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.accessor.*;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.util.CallBack;
+
+public class TypeHelper {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TypeHelper.class);
+
+  private static final int WIDTH_ESTIMATE = 50;
+
+  // Default length when casting to varchar : 65536 = 2^16
+  // This only defines an absolute maximum for values, setting
+  // a high value like this will not inflate the size for small values
+  public static final int VARCHAR_DEFAULT_CAST_LEN = 65536;
+
+  private static String buildErrorMessage(final String operation, final MinorType type, final DataMode mode) {
+    return String.format("Unable to %s for minor type [%s] and mode [%s]", operation, type, mode);
+  }
+
+  private static String buildErrorMessage(final String operation, final MajorType type) {
+    return buildErrorMessage(operation, type.getMinorType(), type.getMode());
+  }
+
+  public static int getSize(MajorType major) {
+    switch (major.getMinorType()) {
+<#list vv.types as type>
+  <#list type.minor as minor>
+    case ${minor.class?upper_case}:
+      return ${type.width}<#if minor.class?substring(0, 3) == "Var" ||
+                               minor.class?substring(0, 3) == "PRO" ||
+                               minor.class?substring(0, 3) == "MSG"> + WIDTH_ESTIMATE</#if>;
+  </#list>
+</#list>
+      case FIXEDCHAR: return major.getWidth();
+      case FIXED16CHAR: return major.getWidth();
+      case FIXEDBINARY: return major.getWidth();
+    }
+    throw new UnsupportedOperationException(buildErrorMessage("get size", major));
+  }
+
+  public static SqlAccessor getSqlAccessor(ValueVector vector){
+    final MajorType type = vector.getField().getType();
+    switch(type.getMinorType()){
+    case UNION:
+      return new UnionSqlAccessor((UnionVector) vector);
+    <#list vv.types as type>
+    <#list type.minor as minor>
+    case ${minor.class?upper_case}:
+      switch (type.getMode()) {
+        case REQUIRED:
+          return new ${minor.class}Accessor((${minor.class}Vector) vector);
+        case OPTIONAL:
+          return new Nullable${minor.class}Accessor((Nullable${minor.class}Vector) vector);
+        case REPEATED:
+          return new GenericAccessor(vector);
+      }
+    </#list>
+    </#list>
+    case MAP:
+    case LIST:
+      return new GenericAccessor(vector);
+    }
+    throw new UnsupportedOperationException(buildErrorMessage("find sql accessor", type));
+  }
+  
+  public static ValueVector getNewVector(SchemaPath parentPath, String name, BufferAllocator allocator, MajorType type, CallBack callback){
+    SchemaPath child = parentPath.getChild(name);
+    MaterializedField field = MaterializedField.create(child, type);
+    return getNewVector(field, allocator, callback);
+  }
+  
+  
+  public static Class<?> getValueVectorClass(MinorType type, DataMode mode){
+    switch (type) {
+    case UNION:
+      return UnionVector.class;
+    case MAP:
+      switch (mode) {
+      case OPTIONAL:
+      case REQUIRED:
+        return MapVector.class;
+      case REPEATED:
+        return RepeatedMapVector.class;
+      }
+      
+    case LIST:
+      switch (mode) {
+      case REPEATED:
+        return RepeatedListVector.class;
+      case REQUIRED:
+      case OPTIONAL:
+        return ListVector.class;
+      }
+    
+<#list vv.types as type>
+  <#list type.minor as minor>
+      case ${minor.class?upper_case}:
+        switch (mode) {
+          case REQUIRED:
+            return ${minor.class}Vector.class;
+          case OPTIONAL:
+            return Nullable${minor.class}Vector.class;
+          case REPEATED:
+            return Repeated${minor.class}Vector.class;
+        }
+  </#list>
+</#list>
+    case GENERIC_OBJECT      :
+      return ObjectVector.class  ;
+    default:
+      break;
+    }
+    throw new UnsupportedOperationException(buildErrorMessage("get value vector class", type, mode));
+  }
+  public static Class<?> getReaderClassName( MinorType type, DataMode mode, boolean isSingularRepeated){
+    switch (type) {
+    case MAP:
+      switch (mode) {
+      case REQUIRED:
+        if (!isSingularRepeated)
+          return SingleMapReaderImpl.class;
+        else
+          return SingleLikeRepeatedMapReaderImpl.class;
+      case REPEATED: 
+          return RepeatedMapReaderImpl.class;
+      }
+    case LIST:
+      switch (mode) {
+      case REQUIRED:
+        return SingleListReaderImpl.class;
+      case REPEATED:
+        return RepeatedListReaderImpl.class;
+      }
+      
+<#list vv.types as type>
+  <#list type.minor as minor>
+      case ${minor.class?upper_case}:
+        switch (mode) {
+          case REQUIRED:
+            return ${minor.class}ReaderImpl.class;
+          case OPTIONAL:
+            return Nullable${minor.class}ReaderImpl.class;
+          case REPEATED:
+            return Repeated${minor.class}ReaderImpl.class;
+        }
+  </#list>
+</#list>
+      default:
+        break;
+      }
+      throw new UnsupportedOperationException(buildErrorMessage("get reader class name", type, mode));
+  }
+  
+  public static Class<?> getWriterInterface( MinorType type, DataMode mode){
+    switch (type) {
+    case UNION: return UnionWriter.class;
+    case MAP: return MapWriter.class;
+    case LIST: return ListWriter.class;
+<#list vv.types as type>
+  <#list type.minor as minor>
+      case ${minor.class?upper_case}: return ${minor.class}Writer.class;
+  </#list>
+</#list>
+      default:
+        break;
+      }
+      throw new UnsupportedOperationException(buildErrorMessage("get writer interface", type, mode));
+  }
+  
+  public static Class<?> getWriterImpl( MinorType type, DataMode mode){
+    switch (type) {
+    case UNION:
+      return UnionWriter.class;
+    case MAP:
+      switch (mode) {
+      case REQUIRED:
+      case OPTIONAL:
+        return SingleMapWriter.class;
+      case REPEATED:
+        return RepeatedMapWriter.class;
+      }
+    case LIST:
+      switch (mode) {
+      case REQUIRED:
+      case OPTIONAL:
+        return UnionListWriter.class;
+      case REPEATED:
+        return RepeatedListWriter.class;
+      }
+      
+<#list vv.types as type>
+  <#list type.minor as minor>
+      case ${minor.class?upper_case}:
+        switch (mode) {
+          case REQUIRED:
+            return ${minor.class}WriterImpl.class;
+          case OPTIONAL:
+            return Nullable${minor.class}WriterImpl.class;
+          case REPEATED:
+            return Repeated${minor.class}WriterImpl.class;
+        }
+  </#list>
+</#list>
+      default:
+        break;
+      }
+      throw new UnsupportedOperationException(buildErrorMessage("get writer implementation", type, mode));
+  }
+
+  public static Class<?> getHolderReaderImpl( MinorType type, DataMode mode){
+    switch (type) {      
+<#list vv.types as type>
+  <#list type.minor as minor>
+      case ${minor.class?upper_case}:
+        switch (mode) {
+          case REQUIRED:
+            return ${minor.class}HolderReaderImpl.class;
+          case OPTIONAL:
+            return Nullable${minor.class}HolderReaderImpl.class;
+          case REPEATED:
+            return Repeated${minor.class}HolderReaderImpl.class;
+        }
+  </#list>
+</#list>
+      default:
+        break;
+      }
+      throw new UnsupportedOperationException(buildErrorMessage("get holder reader implementation", type, mode));
+  }
+  
+  public static JType getHolderType(JCodeModel model, MinorType type, DataMode mode){
+    switch (type) {
+    case UNION:
+      return model._ref(UnionHolder.class);
+    case MAP:
+    case LIST:
+      return model._ref(ComplexHolder.class);
+      
+<#list vv.types as type>
+  <#list type.minor as minor>
+      case ${minor.class?upper_case}:
+        switch (mode) {
+          case REQUIRED:
+            return model._ref(${minor.class}Holder.class);
+          case OPTIONAL:
+            return model._ref(Nullable${minor.class}Holder.class);
+          case REPEATED:
+            return model._ref(Repeated${minor.class}Holder.class);
+        }
+  </#list>
+</#list>
+      case GENERIC_OBJECT:
+        return model._ref(ObjectHolder.class);
+      default:
+        break;
+      }
+      throw new UnsupportedOperationException(buildErrorMessage("get holder type", type, mode));
+  }
+
+  public static ValueVector getNewVector(MaterializedField field, BufferAllocator allocator){
+    return getNewVector(field, allocator, null);
+  }
+  public static ValueVector getNewVector(MaterializedField field, BufferAllocator allocator, CallBack callBack){
+    MajorType type = field.getType();
+
+    switch (type.getMinorType()) {
+    
+    case UNION:
+      return new UnionVector(field, allocator, callBack);
+
+    case MAP:
+      switch (type.getMode()) {
+      case REQUIRED:
+      case OPTIONAL:
+        return new MapVector(field, allocator, callBack);
+      case REPEATED:
+        return new RepeatedMapVector(field, allocator, callBack);
+      }
+    case LIST:
+      switch (type.getMode()) {
+      case REPEATED:
+        return new RepeatedListVector(field, allocator, callBack);
+      case OPTIONAL:
+      case REQUIRED:
+        return new ListVector(field, allocator, callBack);
+      }
+<#list vv.  types as type>
+  <#list type.minor as minor>
+    case ${minor.class?upper_case}:
+      switch (type.getMode()) {
+        case REQUIRED:
+          return new ${minor.class}Vector(field, allocator);
+        case OPTIONAL:
+          return new Nullable${minor.class}Vector(field, allocator);
+        case REPEATED:
+          return new Repeated${minor.class}Vector(field, allocator);
+      }
+  </#list>
+</#list>
+    case GENERIC_OBJECT:
+      return new ObjectVector(field, allocator)        ;
+    default:
+      break;
+    }
+    // All ValueVector types have been handled.
+    throw new UnsupportedOperationException(buildErrorMessage("get new vector", type));
+  }
+
+  public static ValueHolder getValue(ValueVector vector, int index) {
+    MajorType type = vector.getField().getType();
+    ValueHolder holder;
+    switch(type.getMinorType()) {
+<#list vv.types as type>
+  <#list type.minor as minor>
+    case ${minor.class?upper_case} :
+      <#if minor.class?starts_with("Var") || minor.class == "IntervalDay" || minor.class == "Interval" ||
+        minor.class?starts_with("Decimal28") ||  minor.class?starts_with("Decimal38")>
+        switch (type.getMode()) {
+          case REQUIRED:
+            holder = new ${minor.class}Holder();
+            ((${minor.class}Vector) vector).getAccessor().get(index, (${minor.class}Holder)holder);
+            return holder;
+          case OPTIONAL:
+            holder = new Nullable${minor.class}Holder();
+            ((Nullable${minor.class}Holder)holder).isSet = ((Nullable${minor.class}Vector) vector).getAccessor().isSet(index);
+            if (((Nullable${minor.class}Holder)holder).isSet == 1) {
+              ((Nullable${minor.class}Vector) vector).getAccessor().get(index, (Nullable${minor.class}Holder)holder);
+            }
+            return holder;
+        }
+      <#else>
+      switch (type.getMode()) {
+        case REQUIRED:
+          holder = new ${minor.class}Holder();
+          ((${minor.class}Holder)holder).value = ((${minor.class}Vector) vector).getAccessor().get(index);
+          return holder;
+        case OPTIONAL:
+          holder = new Nullable${minor.class}Holder();
+          ((Nullable${minor.class}Holder)holder).isSet = ((Nullable${minor.class}Vector) vector).getAccessor().isSet(index);
+          if (((Nullable${minor.class}Holder)holder).isSet == 1) {
+            ((Nullable${minor.class}Holder)holder).value = ((Nullable${minor.class}Vector) vector).getAccessor().get(index);
+          }
+          return holder;
+      }
+      </#if>
+  </#list>
+</#list>
+    case GENERIC_OBJECT:
+      holder = new ObjectHolder();
+      ((ObjectHolder)holder).obj = ((ObjectVector) vector).getAccessor().getObject(index)         ;
+      break;
+    }
+
+    throw new UnsupportedOperationException(buildErrorMessage("get value", type));
+  }
+
+  public static void setValue(ValueVector vector, int index, ValueHolder holder) {
+    MajorType type = vector.getField().getType();
+
+    switch(type.getMinorType()) {
+<#list vv.types as type>
+  <#list type.minor as minor>
+    case ${minor.class?upper_case} :
+      switch (type.getMode()) {
+        case REQUIRED:
+          ((${minor.class}Vector) vector).getMutator().setSafe(index, (${minor.class}Holder) holder);
+          return;
+        case OPTIONAL:
+          if (((Nullable${minor.class}Holder) holder).isSet == 1) {
+            ((Nullable${minor.class}Vector) vector).getMutator().setSafe(index, (Nullable${minor.class}Holder) holder);
+          }
+          return;
+      }
+  </#list>
+</#list>
+    case GENERIC_OBJECT:
+      ((ObjectVector) vector).getMutator().setSafe(index, (ObjectHolder) holder);
+      return;
+    default:
+      throw new UnsupportedOperationException(buildErrorMessage("set value", type));
+    }
+  }
+
+  public static void setValueSafe(ValueVector vector, int index, ValueHolder holder) {
+    MajorType type = vector.getField().getType();
+
+    switch(type.getMinorType()) {
+      <#list vv.types as type>
+      <#list type.minor as minor>
+      case ${minor.class?upper_case} :
+      switch (type.getMode()) {
+        case REQUIRED:
+          ((${minor.class}Vector) vector).getMutator().setSafe(index, (${minor.class}Holder) holder);
+          return;
+        case OPTIONAL:
+          if (((Nullable${minor.class}Holder) holder).isSet == 1) {
+            ((Nullable${minor.class}Vector) vector).getMutator().setSafe(index, (Nullable${minor.class}Holder) holder);
+          } else {
+            ((Nullable${minor.class}Vector) vector).getMutator().isSafe(index);
+          }
+          return;
+      }
+      </#list>
+      </#list>
+      case GENERIC_OBJECT:
+        ((ObjectVector) vector).getMutator().setSafe(index, (ObjectHolder) holder);
+      default:
+        throw new UnsupportedOperationException(buildErrorMessage("set value safe", type));
+    }
+  }
+
+  public static boolean compareValues(ValueVector v1, int v1index, ValueVector v2, int v2index) {
+    MajorType type1 = v1.getField().getType();
+    MajorType type2 = v2.getField().getType();
+
+    if (type1.getMinorType() != type2.getMinorType()) {
+      return false;
+    }
+
+    switch(type1.getMinorType()) {
+<#list vv.types as type>
+  <#list type.minor as minor>
+    case ${minor.class?upper_case} :
+      if ( ((${minor.class}Vector) v1).getAccessor().get(v1index) == 
+           ((${minor.class}Vector) v2).getAccessor().get(v2index) ) 
+        return true;
+      break;
+  </#list>
+</#list>
+    default:
+      break;
+    }
+    return false;
+  }
+
+  /**
+   *  Create a ValueHolder of MajorType.
+   * @param type
+   * @return
+   */
+  public static ValueHolder createValueHolder(MajorType type) {
+    switch(type.getMinorType()) {
+      <#list vv.types as type>
+      <#list type.minor as minor>
+      case ${minor.class?upper_case} :
+
+        switch (type.getMode()) {
+          case REQUIRED:
+            return new ${minor.class}Holder();
+          case OPTIONAL:
+            return new Nullable${minor.class}Holder();
+          case REPEATED:
+            return new Repeated${minor.class}Holder();
+        }
+      </#list>
+      </#list>
+      case GENERIC_OBJECT:
+        return new ObjectHolder();
+      default:
+        throw new UnsupportedOperationException(buildErrorMessage("create value holder", type));
+    }
+  }
+
+  public static boolean isNull(ValueHolder holder) {
+    MajorType type = TypeHelper.getValueHolderType(holder);
+
+    switch(type.getMinorType()) {
+      <#list vv.types as type>
+      <#list type.minor as minor>
+      case ${minor.class?upper_case} :
+
+      switch (type.getMode()) {
+        case REQUIRED:
+          return true;
+        case OPTIONAL:
+          return ((Nullable${minor.class}Holder) holder).isSet == 0;
+        case REPEATED:
+          return true;
+      }
+      </#list>
+      </#list>
+      default:
+        throw new UnsupportedOperationException(buildErrorMessage("check is null", type));
+    }
+  }
+
+  public static ValueHolder deNullify(ValueHolder holder) {
+    MajorType type = TypeHelper.getValueHolderType(holder);
+
+    switch(type.getMinorType()) {
+      <#list vv.types as type>
+      <#list type.minor as minor>
+      case ${minor.class?upper_case} :
+
+        switch (type.getMode()) {
+          case REQUIRED:
+            return holder;
+          case OPTIONAL:
+            if( ((Nullable${minor.class}Holder) holder).isSet == 1) {
+              ${minor.class}Holder newHolder = new ${minor.class}Holder();
+
+              <#assign fields = minor.fields!type.fields />
+              <#list fields as field>
+              newHolder.${field.name} = ((Nullable${minor.class}Holder) holder).${field.name};
+              </#list>
+
+              return newHolder;
+            } else {
+              throw new UnsupportedOperationException("You can not convert a null value into a non-null value!");
+            }
+          case REPEATED:
+            return holder;
+        }
+      </#list>
+      </#list>
+      default:
+        throw new UnsupportedOperationException(buildErrorMessage("deNullify", type));
+    }
+  }
+
+  public static ValueHolder nullify(ValueHolder holder) {
+    MajorType type = TypeHelper.getValueHolderType(holder);
+
+    switch(type.getMinorType()) {
+      <#list vv.types as type>
+      <#list type.minor as minor>
+      case ${minor.class?upper_case} :
+        switch (type.getMode()) {
+          case REQUIRED:
+            Nullable${minor.class}Holder newHolder = new Nullable${minor.class}Holder();
+            newHolder.isSet = 1;
+            <#assign fields = minor.fields!type.fields />
+            <#list fields as field>
+            newHolder.${field.name} = ((${minor.class}Holder) holder).${field.name};
+            </#list>
+            return newHolder;
+          case OPTIONAL:
+            return holder;
+          case REPEATED:
+            throw new UnsupportedOperationException("You can not convert repeated type " + type + " to nullable type!");
+        }
+      </#list>
+      </#list>
+      default:
+        throw new UnsupportedOperationException(buildErrorMessage("nullify", type));
+    }
+  }
+
+  public static MajorType getValueHolderType(ValueHolder holder) {
+
+    if (0 == 1) {
+      return null;
+    }
+    <#list vv.types as type>
+    <#list type.minor as minor>
+      else if (holder instanceof ${minor.class}Holder) {
+        return ((${minor.class}Holder) holder).TYPE;
+      } else if (holder instanceof Nullable${minor.class}Holder) {
+      return ((Nullable${minor.class}Holder) holder).TYPE;
+      }
+    </#list>
+    </#list>
+
+    throw new UnsupportedOperationException("ValueHolder is not supported for 'getValueHolderType' method.");
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/9969d8bd/exec/vector/src/main/codegen/templates/UnionListWriter.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/UnionListWriter.java b/exec/vector/src/main/codegen/templates/UnionListWriter.java
new file mode 100644
index 0000000..c676769
--- /dev/null
+++ b/exec/vector/src/main/codegen/templates/UnionListWriter.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.lang.UnsupportedOperationException;
+
+<@pp.dropOutputFile />
+<@pp.changeOutputFile name="/org/apache/drill/exec/vector/complex/impl/UnionListWriter.java" />
+
+
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.vector.complex.impl;
+
+<#include "/@includes/vv_imports.ftl" />
+
+/*
+ * This class is generated using freemarker and the ${.template_name} template.
+ */
+
+@SuppressWarnings("unused")
+public class UnionListWriter extends AbstractFieldWriter {
+
+  private ListVector vector;
+  private UInt4Vector offsets;
+  private PromotableWriter writer;
+  private boolean inMap = false;
+  private String mapName;
+  private int lastIndex = 0;
+
+  public UnionListWriter(ListVector vector) {
+    super(null);
+    this.vector = vector;
+    this.writer = new PromotableWriter(vector.getDataVector(), vector);
+    this.offsets = vector.getOffsetVector();
+  }
+
+  public UnionListWriter(ListVector vector, AbstractFieldWriter parent) {
+    this(vector);
+  }
+
+  @Override
+  public void allocate() {
+    vector.allocateNew();
+  }
+
+  @Override
+  public void clear() {
+    vector.clear();
+  }
+
+  @Override
+  public MaterializedField getField() {
+    return null;
+  }
+
+  @Override
+  public int getValueCapacity() {
+    return vector.getValueCapacity();
+  }
+
+  @Override
+  public void close() throws Exception {
+
+  }
+
+  <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+  <#assign fields = minor.fields!type.fields />
+  <#assign uncappedName = name?uncap_first/>
+
+  <#if !minor.class?starts_with("Decimal")>
+
+  @Override
+  public ${name}Writer <#if uncappedName == "int">integer<#else>${uncappedName}</#if>() {
+    return this;
+  }
+
+  @Override
+  public ${name}Writer <#if uncappedName == "int">integer<#else>${uncappedName}</#if>(String name) {
+    assert inMap;
+    mapName = name;
+    final int nextOffset = offsets.getAccessor().get(idx() + 1);
+    vector.getMutator().setNotNull(idx());
+    writer.setPosition(nextOffset);
+    ${name}Writer ${uncappedName}Writer = writer.<#if uncappedName == "int">integer<#else>${uncappedName}</#if>(name);
+    return ${uncappedName}Writer;
+  }
+
+  </#if>
+
+  </#list></#list>
+
+  @Override
+  public MapWriter map() {
+    inMap = true;
+    return this;
+  }
+
+  @Override
+  public ListWriter list() {
+    final int nextOffset = offsets.getAccessor().get(idx() + 1);
+    vector.getMutator().setNotNull(idx());
+    offsets.getMutator().setSafe(idx() + 1, nextOffset + 1);
+    writer.setPosition(nextOffset);
+    return writer;
+  }
+
+  @Override
+  public ListWriter list(String name) {
+    final int nextOffset = offsets.getAccessor().get(idx() + 1);
+    vector.getMutator().setNotNull(idx());
+    writer.setPosition(nextOffset);
+    ListWriter listWriter = writer.list(name);
+    return listWriter;
+  }
+
+  @Override
+  public MapWriter map(String name) {
+    MapWriter mapWriter = writer.map(name);
+    return mapWriter;
+  }
+
+  @Override
+  public void startList() {
+    vector.getMutator().startNewValue(idx());
+  }
+
+  @Override
+  public void endList() {
+
+  }
+
+  @Override
+  public void start() {
+    assert inMap;
+    final int nextOffset = offsets.getAccessor().get(idx() + 1);
+    vector.getMutator().setNotNull(idx());
+    offsets.getMutator().setSafe(idx() + 1, nextOffset);
+    writer.setPosition(nextOffset);
+  }
+
+  @Override
+  public void end() {
+    if (inMap) {
+      inMap = false;
+      final int nextOffset = offsets.getAccessor().get(idx() + 1);
+      offsets.getMutator().setSafe(idx() + 1, nextOffset + 1);
+    }
+  }
+
+  <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+  <#assign fields = minor.fields!type.fields />
+  <#assign uncappedName = name?uncap_first/>
+
+  <#if !minor.class?starts_with("Decimal")>
+
+  @Override
+  public void write${name}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>) {
+    assert !inMap;
+    final int nextOffset = offsets.getAccessor().get(idx() + 1);
+    vector.getMutator().setNotNull(idx());
+    writer.setPosition(nextOffset);
+    writer.write${name}(<#list fields as field>${field.name}<#if field_has_next>, </#if></#list>);
+    offsets.getMutator().setSafe(idx() + 1, nextOffset + 1);
+  }
+
+  </#if>
+
+  </#list></#list>
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/9969d8bd/exec/vector/src/main/codegen/templates/UnionReader.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/UnionReader.java b/exec/vector/src/main/codegen/templates/UnionReader.java
new file mode 100644
index 0000000..46a32ee
--- /dev/null
+++ b/exec/vector/src/main/codegen/templates/UnionReader.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.vector.complex.impl.NullReader;
+
+<@pp.dropOutputFile />
+<@pp.changeOutputFile name="/org/apache/drill/exec/vector/complex/impl/UnionReader.java" />
+
+
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.vector.complex.impl;
+
+<#include "/@includes/vv_imports.ftl" />
+
+@SuppressWarnings("unused")
+public class UnionReader extends AbstractFieldReader {
+
+  private BaseReader[] readers = new BaseReader[43];
+  public UnionVector data;
+  
+  public UnionReader(UnionVector data) {
+    this.data = data;
+  }
+
+  private static MajorType[] TYPES = new MajorType[43];
+
+  static {
+    for (MinorType minorType : MinorType.values()) {
+      TYPES[minorType.getNumber()] = Types.optional(minorType);
+    }
+  }
+
+  public MajorType getType() {
+    return TYPES[data.getTypeValue(idx())];
+  }
+
+  public boolean isSet(){
+    return !data.getAccessor().isNull(idx());
+  }
+
+  public void read(UnionHolder holder) {
+    holder.reader = this;
+    holder.isSet = this.isSet() ? 1 : 0;
+  }
+
+  public void read(int index, UnionHolder holder) {
+    getList().read(index, holder);
+  }
+
+  private FieldReader getReaderForIndex(int index) {
+    int typeValue = data.getTypeValue(index);
+    FieldReader reader = (FieldReader) readers[typeValue];
+    if (reader != null) {
+      return reader;
+    }
+    switch (typeValue) {
+    case 0:
+      return NullReader.INSTANCE;
+    case MinorType.MAP_VALUE:
+      return (FieldReader) getMap();
+    case MinorType.LIST_VALUE:
+      return (FieldReader) getList();
+    <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+    <#assign uncappedName = name?uncap_first/>
+    <#if !minor.class?starts_with("Decimal")>
+    case MinorType.${name?upper_case}_VALUE:
+      return (FieldReader) get${name}();
+    </#if>
+    </#list></#list>
+    default:
+      throw new UnsupportedOperationException("Unsupported type: " + MinorType.valueOf(typeValue));
+    }
+  }
+
+  private SingleMapReaderImpl mapReader;
+
+  private MapReader getMap() {
+    if (mapReader == null) {
+      mapReader = (SingleMapReaderImpl) data.getMap().getReader();
+      mapReader.setPosition(idx());
+      readers[MinorType.MAP_VALUE] = mapReader;
+    }
+    return mapReader;
+  }
+
+  private UnionListReader listReader;
+
+  private FieldReader getList() {
+    if (listReader == null) {
+      listReader = new UnionListReader(data.getList());
+      listReader.setPosition(idx());
+      readers[MinorType.LIST_VALUE] = listReader;
+    }
+    return listReader;
+  }
+
+  @Override
+  public java.util.Iterator<String> iterator() {
+    return getMap().iterator();
+  }
+
+  @Override
+  public void copyAsValue(UnionWriter writer) {
+    writer.data.copyFrom(idx(), writer.idx(), data);
+  }
+
+  <#list ["Object", "BigDecimal", "Integer", "Long", "Boolean",
+          "Character", "DateTime", "Period", "Double", "Float",
+          "Text", "String", "Byte", "Short", "byte[]"] as friendlyType>
+  <#assign safeType=friendlyType />
+  <#if safeType=="byte[]"><#assign safeType="ByteArray" /></#if>
+
+  @Override
+  public ${friendlyType} read${safeType}() {
+    return getReaderForIndex(idx()).read${safeType}();
+  }
+
+  </#list>
+
+  <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+          <#assign uncappedName = name?uncap_first/>
+  <#assign boxedType = (minor.boxedType!type.boxedType) />
+  <#assign javaType = (minor.javaType!type.javaType) />
+  <#assign friendlyType = (minor.friendlyType!minor.boxedType!type.boxedType) />
+  <#assign safeType=friendlyType />
+  <#if safeType=="byte[]"><#assign safeType="ByteArray" /></#if>
+  <#if !minor.class?starts_with("Decimal")>
+
+  private Nullable${name}ReaderImpl ${uncappedName}Reader;
+
+  private Nullable${name}ReaderImpl get${name}() {
+    if (${uncappedName}Reader == null) {
+      ${uncappedName}Reader = new Nullable${name}ReaderImpl(data.get${name}Vector());
+      ${uncappedName}Reader.setPosition(idx());
+      readers[MinorType.${name?upper_case}_VALUE] = ${uncappedName}Reader;
+    }
+    return ${uncappedName}Reader;
+  }
+
+  public void read(Nullable${name}Holder holder){
+    getReaderForIndex(idx()).read(holder);
+  }
+
+  public void copyAsValue(${name}Writer writer){
+    getReaderForIndex(idx()).copyAsValue(writer);
+  }
+  </#if>
+  </#list></#list>
+
+  @Override
+  public void copyAsValue(ListWriter writer) {
+    ComplexCopier.copy(this, (FieldWriter) writer);
+  }
+
+  @Override
+  public void setPosition(int index) {
+    super.setPosition(index);
+    for (BaseReader reader : readers) {
+      if (reader != null) {
+        reader.setPosition(index);
+      }
+    }
+  }
+  
+  public FieldReader reader(String name){
+    return getMap().reader(name);
+  }
+
+  public FieldReader reader() {
+    return getList().reader();
+  }
+
+  public boolean next() {
+    return getReaderForIndex(idx()).next();
+  }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/drill/blob/9969d8bd/exec/vector/src/main/codegen/templates/UnionVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/UnionVector.java b/exec/vector/src/main/codegen/templates/UnionVector.java
new file mode 100644
index 0000000..dfcccb2
--- /dev/null
+++ b/exec/vector/src/main/codegen/templates/UnionVector.java
@@ -0,0 +1,479 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+
+<@pp.dropOutputFile />
+<@pp.changeOutputFile name="/org/apache/drill/exec/vector/complex/UnionVector.java" />
+
+
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.vector.complex;
+
+<#include "/@includes/vv_imports.ftl" />
+import java.util.Iterator;
+import org.apache.drill.exec.vector.complex.impl.ComplexCopier;
+import org.apache.drill.exec.util.CallBack;
+import org.apache.drill.common.expression.PathSegment;
+
+/*
+ * This class is generated using freemarker and the ${.template_name} template.
+ */
+@SuppressWarnings("unused")
+
+
+/**
+ * A vector which can hold values of different types. It does so by using a MapVector which contains a vector for each
+ * primitive type that is stored. MapVector is used in order to take advantage of its serialization/deserialization methods,
+ * as well as the addOrGet method.
+ *
+ * For performance reasons, UnionVector stores a cached reference to each subtype vector, to avoid having to do the map lookup
+ * each time the vector is accessed.
+ */
+public class UnionVector implements ValueVector {
+
+  private MaterializedField field;
+  private BufferAllocator allocator;
+  private Accessor accessor = new Accessor();
+  private Mutator mutator = new Mutator();
+  private int valueCount;
+
+  private MapVector internalMap;
+  private UInt1Vector typeVector;
+
+  private MapVector mapVector;
+  private ListVector listVector;
+
+  private FieldReader reader;
+  private NullableBitVector bit;
+
+  private int singleType = 0;
+  private ValueVector singleVector;
+  private MajorType majorType;
+
+  private final CallBack callBack;
+
+  public UnionVector(MaterializedField field, BufferAllocator allocator, CallBack callBack) {
+    this.field = field.clone();
+    this.allocator = allocator;
+    this.internalMap = new MapVector("internal", allocator, callBack);
+    this.typeVector = internalMap.addOrGet("types", Types.required(MinorType.UINT1), UInt1Vector.class);
+    this.field.addChild(internalMap.getField().clone());
+    this.majorType = field.getType();
+    this.callBack = callBack;
+  }
+
+  public List<MinorType> getSubTypes() {
+    return majorType.getSubTypeList();
+  }
+
+  private void addSubType(MinorType type) {
+    majorType =  MajorType.newBuilder(this.majorType).addSubType(type).build();
+    if (callBack != null) {
+      callBack.doWork();
+    }
+  }
+
+  private static final MajorType MAP_TYPE = Types.optional(MinorType.MAP);
+
+  public MapVector getMap() {
+    if (mapVector == null) {
+      int vectorCount = internalMap.size();
+      mapVector = internalMap.addOrGet("map", MAP_TYPE, MapVector.class);
+      addSubType(MinorType.MAP);
+      if (internalMap.size() > vectorCount) {
+        mapVector.allocateNew();
+      }
+    }
+    return mapVector;
+  }
+
+  <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+  <#assign fields = minor.fields!type.fields />
+  <#assign uncappedName = name?uncap_first/>
+  <#if !minor.class?starts_with("Decimal")>
+
+  private Nullable${name}Vector ${uncappedName}Vector;
+  private static final MajorType ${name?upper_case}_TYPE = Types.optional(MinorType.${name?upper_case});
+
+  public Nullable${name}Vector get${name}Vector() {
+    if (${uncappedName}Vector == null) {
+      int vectorCount = internalMap.size();
+      ${uncappedName}Vector = internalMap.addOrGet("${uncappedName}", ${name?upper_case}_TYPE, Nullable${name}Vector.class);
+      addSubType(MinorType.${name?upper_case});
+      if (internalMap.size() > vectorCount) {
+        ${uncappedName}Vector.allocateNew();
+      }
+    }
+    return ${uncappedName}Vector;
+  }
+
+  </#if>
+
+  </#list></#list>
+
+  private static final MajorType LIST_TYPE = Types.optional(MinorType.LIST);
+
+  public ListVector getList() {
+    if (listVector == null) {
+      int vectorCount = internalMap.size();
+      listVector = internalMap.addOrGet("list", LIST_TYPE, ListVector.class);
+      addSubType(MinorType.LIST);
+      if (internalMap.size() > vectorCount) {
+        listVector.allocateNew();
+      }
+    }
+    return listVector;
+  }
+
+  public int getTypeValue(int index) {
+    return typeVector.getAccessor().get(index);
+  }
+
+  public UInt1Vector getTypeVector() {
+    return typeVector;
+  }
+
+  @Override
+  public void allocateNew() throws OutOfMemoryRuntimeException {
+    internalMap.allocateNew();
+    if (typeVector != null) {
+      typeVector.zeroVector();
+    }
+  }
+
+  @Override
+  public boolean allocateNewSafe() {
+    boolean safe = internalMap.allocateNewSafe();
+    if (safe) {
+      if (typeVector != null) {
+        typeVector.zeroVector();
+      }
+    }
+    return safe;
+  }
+
+  @Override
+  public void setInitialCapacity(int numRecords) {
+  }
+
+  @Override
+  public int getValueCapacity() {
+    return Math.min(typeVector.getValueCapacity(), internalMap.getValueCapacity());
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public void clear() {
+    internalMap.clear();
+  }
+
+  @Override
+  public MaterializedField getField() {
+    return field;
+  }
+
+  @Override
+  public TransferPair getTransferPair() {
+    return new TransferImpl(field);
+  }
+
+  @Override
+  public TransferPair getTransferPair(FieldReference ref) {
+    return new TransferImpl(field.withPath(ref));
+  }
+
+  @Override
+  public TransferPair makeTransferPair(ValueVector target) {
+    return new TransferImpl((UnionVector) target);
+  }
+
+  public void transferTo(UnionVector target) {
+    internalMap.makeTransferPair(target.internalMap).transfer();
+    target.valueCount = valueCount;
+    target.majorType = majorType;
+  }
+
+  public void copyFrom(int inIndex, int outIndex, UnionVector from) {
+    from.getReader().setPosition(inIndex);
+    getWriter().setPosition(outIndex);
+    ComplexCopier.copy(from.reader, mutator.writer);
+  }
+
+  public void copyFromSafe(int inIndex, int outIndex, UnionVector from) {
+    copyFrom(inIndex, outIndex, from);
+  }
+
+  public void addVector(ValueVector v) {
+    String name = v.getField().getType().getMinorType().name().toLowerCase();
+    Preconditions.checkState(internalMap.getChild(name) == null, String.format("%s vector already exists", name));
+    internalMap.putChild(name, v);
+    addSubType(v.getField().getType().getMinorType());
+  }
+
+  private class TransferImpl implements TransferPair {
+
+    UnionVector to;
+
+    public TransferImpl(MaterializedField field) {
+      to = new UnionVector(field, allocator, null);
+    }
+
+    public TransferImpl(UnionVector to) {
+      this.to = to;
+    }
+
+    @Override
+    public void transfer() {
+      transferTo(to);
+    }
+
+    @Override
+    public void splitAndTransfer(int startIndex, int length) {
+
+    }
+
+    @Override
+    public ValueVector getTo() {
+      return to;
+    }
+
+    @Override
+    public void copyValueSafe(int from, int to) {
+      this.to.copyFrom(from, to, UnionVector.this);
+    }
+  }
+
+  @Override
+  public Accessor getAccessor() {
+    return accessor;
+  }
+
+  @Override
+  public Mutator getMutator() {
+    return mutator;
+  }
+
+  @Override
+  public FieldReader getReader() {
+    if (reader == null) {
+      reader = new UnionReader(this);
+    }
+    return reader;
+  }
+
+  public FieldWriter getWriter() {
+    if (mutator.writer == null) {
+      mutator.writer = new UnionWriter(this);
+    }
+    return mutator.writer;
+  }
+
+  @Override
+  public UserBitShared.SerializedField getMetadata() {
+    SerializedField.Builder b = getField() //
+            .getAsBuilder() //
+            .setBufferLength(getBufferSize()) //
+            .setValueCount(valueCount);
+
+    b.addChild(internalMap.getMetadata());
+    return b.build();
+  }
+
+  @Override
+  public int getBufferSize() {
+    return internalMap.getBufferSize();
+  }
+
+  @Override
+  public int getBufferSizeFor(final int valueCount) {
+    if (valueCount == 0) {
+      return 0;
+    }
+
+    long bufferSize = 0;
+    for (final ValueVector v : (Iterable<ValueVector>) this) {
+      bufferSize += v.getBufferSizeFor(valueCount);
+    }
+
+    return (int) bufferSize;
+  }
+
+  @Override
+  public DrillBuf[] getBuffers(boolean clear) {
+    return internalMap.getBuffers(clear);
+  }
+
+  @Override
+  public void load(UserBitShared.SerializedField metadata, DrillBuf buffer) {
+    valueCount = metadata.getValueCount();
+
+    internalMap.load(metadata.getChild(0), buffer);
+  }
+
+  @Override
+  public Iterator<ValueVector> iterator() {
+    List<ValueVector> vectors = Lists.newArrayList(internalMap.iterator());
+    vectors.add(typeVector);
+    return vectors.iterator();
+  }
+
+  public TypedFieldId getFieldIdIfMatches(TypedFieldId.Builder builder, boolean addToBreadCrumb, PathSegment seg) {
+    if (seg.isNamed()) {
+      ValueVector v = getMap();
+      if (v != null) {
+        return ((AbstractContainerVector) v).getFieldIdIfMatches(builder, addToBreadCrumb, seg);
+      } else {
+        return null;
+      }
+    } else if (seg.isArray()) {
+      ValueVector v = getList();
+      if (v != null) {
+        return ((ListVector) v).getFieldIdIfMatches(builder, addToBreadCrumb, seg);
+      }
+      else return null;
+    }
+    return null;
+  }
+
+  public class Accessor extends BaseValueVector.BaseAccessor {
+
+
+    @Override
+    public Object getObject(int index) {
+      int type = typeVector.getAccessor().get(index);
+      switch (type) {
+      case 0:
+        return null;
+      <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+      <#assign fields = minor.fields!type.fields />
+      <#assign uncappedName = name?uncap_first/>
+      <#if !minor.class?starts_with("Decimal")>
+      case MinorType.${name?upper_case}_VALUE:
+        return get${name}Vector().getAccessor().getObject(index);
+      </#if>
+
+      </#list></#list>
+      case MinorType.MAP_VALUE:
+        return getMap().getAccessor().getObject(index);
+      case MinorType.LIST_VALUE:
+        return getList().getAccessor().getObject(index);
+      default:
+        throw new UnsupportedOperationException("Cannot support type: " + MinorType.valueOf(type));
+      }
+    }
+
+    public byte[] get(int index) {
+      return null;
+    }
+
+    public void get(int index, ComplexHolder holder) {
+    }
+
+    public void get(int index, UnionHolder holder) {
+      if (reader == null) {
+        reader = new UnionReader(UnionVector.this);
+      }
+      reader.setPosition(index);
+      holder.reader = reader;
+    }
+
+    @Override
+    public int getValueCount() {
+      return valueCount;
+    }
+
+    @Override
+    public boolean isNull(int index) {
+      return typeVector.getAccessor().get(index) == 0;
+    }
+
+    public int isSet(int index) {
+      return isNull(index) ? 0 : 1;
+    }
+  }
+
+  public class Mutator extends BaseValueVector.BaseMutator {
+
+    UnionWriter writer;
+
+    @Override
+    public void setValueCount(int valueCount) {
+      UnionVector.this.valueCount = valueCount;
+      internalMap.getMutator().setValueCount(valueCount);
+    }
+
+    public void setSafe(int index, UnionHolder holder) {
+      FieldReader reader = holder.reader;
+      if (writer == null) {
+        writer = new UnionWriter(UnionVector.this);
+      }
+      writer.setPosition(index);
+      MinorType type = reader.getType().getMinorType();
+      switch (type) {
+      <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+      <#assign fields = minor.fields!type.fields />
+      <#assign uncappedName = name?uncap_first/>
+      <#if !minor.class?starts_with("Decimal")>
+      case ${name?upper_case}:
+        Nullable${name}Holder ${uncappedName}Holder = new Nullable${name}Holder();
+        reader.read(${uncappedName}Holder);
+        setSafe(index, ${uncappedName}Holder);
+        break;
+      </#if>
+      </#list></#list>
+      case MAP: {
+        ComplexCopier.copy(reader, writer);
+        break;
+      }
+      case LIST: {
+        ComplexCopier.copy(reader, writer);
+        break;
+      }
+      default:
+        throw new UnsupportedOperationException();
+      }
+    }
+
+    <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+    <#assign fields = minor.fields!type.fields />
+    <#assign uncappedName = name?uncap_first/>
+    <#if !minor.class?starts_with("Decimal")>
+    public void setSafe(int index, Nullable${name}Holder holder) {
+      setType(index, MinorType.${name?upper_case});
+      get${name}Vector().getMutator().setSafe(index, holder);
+    }
+
+    </#if>
+    </#list></#list>
+
+    public void setType(int index, MinorType type) {
+      typeVector.getMutator().setSafe(index, type.getNumber());
+    }
+
+    @Override
+    public void reset() { }
+
+    @Override
+    public void generateTestData(int values) { }
+  }
+}


Mime
View raw message