drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject [4/6] drill git commit: DRILL-3229: Implement Union type vector
Date Tue, 03 Nov 2015 08:44:37 GMT
DRILL-3229: Implement Union type vector


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/eb6325dc
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/eb6325dc
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/eb6325dc

Branch: refs/heads/master
Commit: eb6325dc9b59291582cd7d3c3e5d02efd5d15906
Parents: ca5a847
Author: Steven Phillips <smp@apache.org>
Authored: Thu Oct 1 03:26:34 2015 -0700
Committer: Steven Phillips <smp@apache.org>
Committed: Mon Nov 2 22:28:32 2015 -0800

----------------------------------------------------------------------
 .../common/expression/fn/CastFunctions.java     |   1 +
 .../org/apache/drill/common/types/Types.java    |   5 +
 .../codegen/templates/AbstractFieldReader.java  |   4 +
 .../codegen/templates/AbstractFieldWriter.java  |  10 +
 .../codegen/templates/AbstractRecordWriter.java |   5 +
 .../src/main/codegen/templates/BaseReader.java  |   3 +
 .../src/main/codegen/templates/BaseWriter.java  |   4 +-
 .../main/codegen/templates/ComplexCopier.java   | 134 ++++++
 .../main/codegen/templates/ComplexReaders.java  |   9 +-
 .../templates/EventBasedRecordWriter.java       |   4 +
 .../codegen/templates/HolderReaderImpl.java     |  22 +
 .../src/main/codegen/templates/ListWriters.java |   8 +-
 .../src/main/codegen/templates/MapWriters.java  |  46 +-
 .../src/main/codegen/templates/NullReader.java  |  10 +-
 .../main/codegen/templates/RecordWriter.java    |   1 +
 .../src/main/codegen/templates/TypeHelper.java  |  25 +-
 .../main/codegen/templates/UnionFunctions.java  |  88 ++++
 .../main/codegen/templates/UnionListWriter.java | 187 ++++++++
 .../src/main/codegen/templates/UnionReader.java | 183 ++++++++
 .../src/main/codegen/templates/UnionVector.java | 433 +++++++++++++++++++
 .../src/main/codegen/templates/UnionWriter.java | 228 ++++++++++
 .../org/apache/drill/exec/ExecConstants.java    |   2 +
 .../exec/expr/ExpressionTreeMaterializer.java   |  83 ++--
 .../drill/exec/expr/GetSetVectorHelper.java     |   4 +
 .../drill/exec/expr/fn/DrillFuncHolder.java     |   2 +-
 .../drill/exec/expr/fn/impl/MappifyUtility.java |   4 +-
 .../drill/exec/expr/fn/impl/UnionFunctions.java | 115 +++++
 .../drill/exec/expr/holders/UnionHolder.java    |  37 ++
 .../physical/impl/filter/FilterRecordBatch.java |   4 +-
 .../impl/flatten/FlattenRecordBatch.java        |   4 +-
 .../impl/project/ProjectRecordBatch.java        |   5 +-
 .../drill/exec/record/AbstractRecordBatch.java  |   9 +
 .../drill/exec/record/SimpleVectorWrapper.java  |  11 +
 .../exec/resolver/ResolverTypePrecedence.java   |   1 +
 .../drill/exec/resolver/TypeCastRules.java      |  10 +
 .../server/options/SystemOptionManager.java     |   1 +
 .../drill/exec/store/avro/MapOrListWriter.java  |   4 +-
 .../exec/store/easy/json/JSONRecordReader.java  |   4 +-
 .../exec/store/easy/json/JsonRecordWriter.java  |  24 +
 .../exec/vector/accessor/UnionSqlAccessor.java  | 129 ++++++
 .../drill/exec/vector/complex/ListVector.java   | 292 +++++++++++++
 .../exec/vector/complex/fn/JsonReader.java      | 147 ++++---
 .../exec/vector/complex/fn/JsonWriter.java      |  11 +-
 .../vector/complex/impl/AbstractBaseReader.java |  17 +
 .../vector/complex/impl/ComplexWriterImpl.java  |  12 +-
 .../vector/complex/impl/UnionListReader.java    |  90 ++++
 .../complex/impl/VectorContainerWriter.java     |   8 +-
 .../drill/exec/store/TestOutputMutator.java     |   4 +
 .../vector/complex/writer/TestJsonReader.java   | 104 +++++
 .../vector/complex/writer/TestRepeated.java     |  24 +-
 .../src/test/resources/jsoninput/union/a.json   |  52 +++
 .../drill/jdbc/test/TestJdbcDistQuery.java      |   1 -
 .../apache/drill/common/types/MinorType.java    |   4 +-
 .../apache/drill/common/types/TypeProtos.java   |  18 +-
 protocol/src/main/protobuf/Types.proto          |   1 +
 55 files changed, 2492 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/common/src/main/java/org/apache/drill/common/expression/fn/CastFunctions.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/expression/fn/CastFunctions.java b/common/src/main/java/org/apache/drill/common/expression/fn/CastFunctions.java
index c0eaa90..34997ab 100644
--- a/common/src/main/java/org/apache/drill/common/expression/fn/CastFunctions.java
+++ b/common/src/main/java/org/apache/drill/common/expression/fn/CastFunctions.java
@@ -36,6 +36,7 @@ public class CastFunctions {
   private static Map<String, String> CAST_FUNC_REPLACEMENT_FROM_NULLABLE = new HashMap<>();
 
   static {
+    TYPE2FUNC.put(MinorType.UNION, "castUNION");
     TYPE2FUNC.put(MinorType.BIGINT, "castBIGINT");
     TYPE2FUNC.put(MinorType.INT, "castINT");
     TYPE2FUNC.put(MinorType.BIT, "castBIT");

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/common/src/main/java/org/apache/drill/common/types/Types.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/types/Types.java b/common/src/main/java/org/apache/drill/common/types/Types.java
index 69b1b4c..adc3eef 100644
--- a/common/src/main/java/org/apache/drill/common/types/Types.java
+++ b/common/src/main/java/org/apache/drill/common/types/Types.java
@@ -143,6 +143,7 @@ public class Types {
       case MAP:             return "MAP";
       case LATE:            return "ANY";
       case NULL:            return "NULL";
+      case UNION:           return "UNION";
 
       // Internal types not actually used at level of SQL types(?):
 
@@ -228,6 +229,8 @@ public class Types {
       return java.sql.Types.VARBINARY;
     case VARCHAR:
       return java.sql.Types.VARCHAR;
+    case UNION:
+      return java.sql.Types.OTHER;
     default:
       // TODO:  This isn't really an unsupported-operation/-type case; this
       //   is an unexpected, code-out-of-sync-with-itself case, so use an
@@ -290,6 +293,7 @@ public class Types {
           case LATE:
           case LIST:
           case MAP:
+          case UNION:
           case NULL:
           case TIMETZ:      // SQL TIME WITH TIME ZONE
           case TIMESTAMPTZ: // SQL TIMESTAMP WITH TIME ZONE
@@ -340,6 +344,7 @@ public class Types {
     case VARBINARY:
     case VAR16CHAR:
     case VARCHAR:
+    case UNION:
       return false;
     default:
       return true;

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/codegen/templates/AbstractFieldReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/AbstractFieldReader.java b/exec/java-exec/src/main/codegen/templates/AbstractFieldReader.java
index 5420f99..89afd7c 100644
--- a/exec/java-exec/src/main/codegen/templates/AbstractFieldReader.java
+++ b/exec/java-exec/src/main/codegen/templates/AbstractFieldReader.java
@@ -67,6 +67,10 @@ abstract class AbstractFieldReader extends AbstractBaseReader implements FieldRe
   <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
   <#assign boxedType = (minor.boxedType!type.boxedType) />
 
+  public void read(${name}Holder holder){
+    fail("${name}");
+  }
+
   public void read(Nullable${name}Holder holder){
     fail("${name}");
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java b/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java
index 9b67304..2da7141 100644
--- a/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java
@@ -45,6 +45,16 @@ abstract class AbstractFieldWriter extends AbstractBaseWriter implements FieldWr
     throw new IllegalStateException(String.format("You tried to end when you are using a ValueWriter of type %s.", this.getClass().getSimpleName()));
   }
 
+  @Override
+  public void startList() {
+    throw new IllegalStateException(String.format("You tried to start when you are using a ValueWriter of type %s.", this.getClass().getSimpleName()));
+  }
+
+  @Override
+  public void endList() {
+    throw new IllegalStateException(String.format("You tried to end when you are using a ValueWriter of type %s.", this.getClass().getSimpleName()));
+  }
+
   <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
   <#assign fields = minor.fields!type.fields />
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java b/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
index 5f1f42f..13f7482 100644
--- a/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
@@ -55,6 +55,11 @@ public abstract class AbstractRecordWriter implements RecordWriter {
   }
 
   @Override
+  public FieldConverter getNewUnionConverter(int fieldId, String fieldName, FieldReader reader) {
+    throw new UnsupportedOperationException("Doesn't support writing Union type'");
+  }
+
+  @Override
   public FieldConverter getNewRepeatedMapConverter(int fieldId, String fieldName, FieldReader reader) {
     throw new UnsupportedOperationException("Doesn't support writing RepeatedMap");
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/codegen/templates/BaseReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/BaseReader.java b/exec/java-exec/src/main/codegen/templates/BaseReader.java
index 116deda..4fce409 100644
--- a/exec/java-exec/src/main/codegen/templates/BaseReader.java
+++ b/exec/java-exec/src/main/codegen/templates/BaseReader.java
@@ -33,6 +33,9 @@ public interface BaseReader extends Positionable{
   MajorType getType();
   MaterializedField getField();
   void reset();
+  void read(UnionHolder holder);
+  void read(int index, UnionHolder holder);
+  void copyAsValue(UnionWriter writer);
 
   public interface MapReader extends BaseReader, Iterable<String>{
     FieldReader reader(String name);

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/codegen/templates/BaseWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/BaseWriter.java b/exec/java-exec/src/main/codegen/templates/BaseWriter.java
index 7697880..da27e66 100644
--- a/exec/java-exec/src/main/codegen/templates/BaseWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/BaseWriter.java
@@ -57,8 +57,8 @@ package org.apache.drill.exec.vector.complex.writer;
   }
 
   public interface ListWriter extends BaseWriter {
-    void start();
-    void end();
+    void startList();
+    void endList();
     MapWriter map();
     ListWriter list();
     void copyReader(FieldReader reader);

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/codegen/templates/ComplexCopier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ComplexCopier.java b/exec/java-exec/src/main/codegen/templates/ComplexCopier.java
new file mode 100644
index 0000000..c3b5cb5
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/templates/ComplexCopier.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+<@pp.dropOutputFile />
+<@pp.changeOutputFile name="/org/apache/drill/exec/vector/complex/impl/ComplexCopier.java" />
+
+
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.vector.complex.impl;
+
+<#include "/@includes/vv_imports.ftl" />
+
+/*
+ * This class is generated using freemarker and the ${.template_name} template.
+ */
+@SuppressWarnings("unused")
+public class ComplexCopier {
+
+  FieldReader in;
+  FieldWriter out;
+
+  public ComplexCopier(FieldReader in, FieldWriter out) {
+    this.in = in;
+    this.out = out;
+  }
+
+  public void write() {
+    writeValue(in, out);
+  }
+
+  private void writeValue(FieldReader reader, FieldWriter writer) {
+    final DataMode m = reader.getType().getMode();
+    final MinorType mt = reader.getType().getMinorType();
+
+    switch(m){
+    case OPTIONAL:
+    case REQUIRED:
+
+
+      switch (mt) {
+
+      case LIST:
+        writer.startList();
+        while (reader.next()) {
+          writeValue(reader.reader(), getListWriterForReader(reader.reader(), writer));
+        }
+        writer.endList();
+        break;
+      case MAP:
+        writer.start();
+        if (reader.isSet()) {
+          for(String name : reader){
+            FieldReader childReader = reader.reader(name);
+            if(childReader.isSet()){
+              writeValue(childReader, getMapWriterForReader(childReader, writer, name));
+            }
+          }
+        }
+        writer.end();
+        break;
+  <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+  <#assign fields = minor.fields!type.fields />
+  <#assign uncappedName = name?uncap_first/>
+  <#if !minor.class?starts_with("Decimal")>
+
+      case ${name?upper_case}:
+        if (reader.isSet()) {
+          Nullable${name}Holder ${uncappedName}Holder = new Nullable${name}Holder();
+          reader.read(${uncappedName}Holder);
+          writer.write${name}(<#list fields as field>${uncappedName}Holder.${field.name}<#if field_has_next>, </#if></#list>);
+        }
+        break;
+
+  </#if>
+  </#list></#list>
+      }
+              break;
+    }
+ }
+
+  private FieldWriter getMapWriterForReader(FieldReader reader, MapWriter writer, String name) {
+    switch (reader.getType().getMinorType()) {
+    <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+    <#assign fields = minor.fields!type.fields />
+    <#assign uncappedName = name?uncap_first/>
+    <#if !minor.class?starts_with("Decimal")>
+    case ${name?upper_case}:
+      return (FieldWriter) writer.<#if name == "Int">integer<#else>${uncappedName}</#if>(name);
+    </#if>
+    </#list></#list>
+    case MAP:
+      return (FieldWriter) writer.map(name);
+    case LIST:
+      return (FieldWriter) writer.list(name);
+    default:
+      throw new UnsupportedOperationException(reader.getType().toString());
+    }
+  }
+
+  private FieldWriter getListWriterForReader(FieldReader reader, ListWriter writer) {
+    switch (reader.getType().getMinorType()) {
+    <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+    <#assign fields = minor.fields!type.fields />
+    <#assign uncappedName = name?uncap_first/>
+    <#if !minor.class?starts_with("Decimal")>
+    case ${name?upper_case}:
+    return (FieldWriter) writer.<#if name == "Int">integer<#else>${uncappedName}</#if>();
+    </#if>
+    </#list></#list>
+    case MAP:
+      return (FieldWriter) writer.map();
+    case LIST:
+      return (FieldWriter) writer.list();
+    default:
+      throw new UnsupportedOperationException(reader.getType().toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/codegen/templates/ComplexReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ComplexReaders.java b/exec/java-exec/src/main/codegen/templates/ComplexReaders.java
index 068efb4..607b71d 100644
--- a/exec/java-exec/src/main/codegen/templates/ComplexReaders.java
+++ b/exec/java-exec/src/main/codegen/templates/ComplexReaders.java
@@ -119,7 +119,13 @@ public class ${nullMode}${name}ReaderImpl extends AbstractFieldReader {
     ${nullMode}${minor.class?cap_first}WriterImpl impl = (${nullMode}${minor.class?cap_first}WriterImpl) writer.${lowerName}(name);
     impl.vector.copyFromSafe(idx(), impl.idx(), vector);
   }
-  
+
+  <#if nullMode != "Nullable">
+  public void read(${minor.class?cap_first}Holder h){
+    vector.getAccessor().get(idx(), h);
+  }
+  </#if>
+
   public void read(Nullable${minor.class?cap_first}Holder h){
     vector.getAccessor().get(idx(), h);
   }
@@ -157,6 +163,7 @@ public interface ${name}Reader extends BaseReader{
   public Object readObject(int arrayIndex);
   public ${friendlyType} read${safeType}(int arrayIndex);
   <#else>
+  public void read(${minor.class?cap_first}Holder h);
   public void read(Nullable${minor.class?cap_first}Holder h);
   public Object readObject();
   public ${friendlyType} read${safeType}();

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
index cf1529d..bf447c9 100644
--- a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
@@ -31,6 +31,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.planner.physical.WriterPrel;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.complex.impl.UnionReader;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
 import java.io.IOException;
@@ -118,6 +119,9 @@ public class EventBasedRecordWriter {
   }
 
   public static FieldConverter getConverter(RecordWriter recordWriter, int fieldId, String fieldName, FieldReader reader) {
+    if (reader instanceof UnionReader) {
+      return recordWriter.getNewUnionConverter(fieldId, fieldName, reader);
+    }
     switch (reader.getType().getMinorType()) {
       case MAP:
         switch (reader.getType().getMode()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/codegen/templates/HolderReaderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/HolderReaderImpl.java b/exec/java-exec/src/main/codegen/templates/HolderReaderImpl.java
index 3bf9403..9bb4f82 100644
--- a/exec/java-exec/src/main/codegen/templates/HolderReaderImpl.java
+++ b/exec/java-exec/src/main/codegen/templates/HolderReaderImpl.java
@@ -30,6 +30,7 @@
 <#assign friendlyType = (minor.friendlyType!minor.boxedType!type.boxedType) />
 <#assign safeType=friendlyType />
 <#if safeType=="byte[]"><#assign safeType="ByteArray" /></#if>
+<#assign fields = minor.fields!type.fields />
 
 <@pp.changeOutputFile name="/org/apache/drill/exec/vector/complex/impl/${holderMode}${name}HolderReaderImpl.java" />
 <#include "/@includes/license.ftl" />
@@ -116,6 +117,22 @@ public class ${holderMode}${name}HolderReaderImpl extends AbstractFieldReader {
     
   }
 
+<#if holderMode != "Repeated">
+@Override
+  public void read(${name}Holder h) {
+  <#list fields as field>
+    h.${field.name} = holder.${field.name};
+  </#list>
+  }
+
+  @Override
+  public void read(Nullable${name}Holder h) {
+  <#list fields as field>
+    h.${field.name} = holder.${field.name};
+  </#list>
+  }
+</#if>
+
 <#if holderMode == "Repeated">
   @Override
   public ${friendlyType} read${safeType}(int index){
@@ -262,6 +279,11 @@ public class ${holderMode}${name}HolderReaderImpl extends AbstractFieldReader {
 </#if>
   }
 
+<#if holderMode != "Repeated" && nullMode != "Nullable">
+  public void copyAsValue(${minor.class?cap_first}Writer writer){
+    writer.write(holder);
+  }
+</#if>
 }
 
 </#list>

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/codegen/templates/ListWriters.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ListWriters.java b/exec/java-exec/src/main/codegen/templates/ListWriters.java
index f2683a3..16d41ec 100644
--- a/exec/java-exec/src/main/codegen/templates/ListWriters.java
+++ b/exec/java-exec/src/main/codegen/templates/ListWriters.java
@@ -181,7 +181,7 @@ public class ${mode}ListWriter extends AbstractFieldWriter {
 
   <#if mode == "Repeated">
 
-  public void start() {
+  public void startList() {
     final RepeatedListVector list = (RepeatedListVector) container;
     final RepeatedListVector.RepeatedMutator mutator = list.getMutator();
 
@@ -202,7 +202,7 @@ public class ${mode}ListWriter extends AbstractFieldWriter {
     }
   }
 
-  public void end() {
+  public void endList() {
     // noop, we initialize state at start rather than end.
   }
   <#else>
@@ -214,11 +214,11 @@ public class ${mode}ListWriter extends AbstractFieldWriter {
     }
   }
 
-  public void start() {
+  public void startList() {
     // noop
   }
 
-  public void end() {
+  public void endList() {
     // noop
   }
   </#if>

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/codegen/templates/MapWriters.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/MapWriters.java b/exec/java-exec/src/main/codegen/templates/MapWriters.java
index 9683338..42cacab 100644
--- a/exec/java-exec/src/main/codegen/templates/MapWriters.java
+++ b/exec/java-exec/src/main/codegen/templates/MapWriters.java
@@ -52,9 +52,18 @@ public class ${mode}MapWriter extends AbstractFieldWriter {
   private final Map<String, FieldWriter> fields = Maps.newHashMap();
   <#if mode == "Repeated">private int currentChildIndex = 0;</#if>
 
-  public ${mode}MapWriter(${containerClass} container, FieldWriter parent) {
+  private final boolean unionEnabled;
+  private final boolean unionInternalMap;
+
+  public ${mode}MapWriter(${containerClass} container, FieldWriter parent, boolean unionEnabled, boolean unionInternalMap) {
     super(parent);
     this.container = container;
+    this.unionEnabled = unionEnabled;
+    this.unionInternalMap = unionInternalMap;
+  }
+
+  public ${mode}MapWriter(${containerClass} container, FieldWriter parent) {
+    this(container, parent, false, false);
   }
 
   @Override
@@ -70,10 +79,15 @@ public class ${mode}MapWriter extends AbstractFieldWriter {
   @Override
   public MapWriter map(String name) {
       FieldWriter writer = fields.get(name.toLowerCase());
-    if(writer == null) {
-      int vectorCount = container.size();
-      MapVector vector = container.addOrGet(name, MapVector.TYPE, MapVector.class);
-      writer = new SingleMapWriter(vector, this);
+    if(writer == null){
+      int vectorCount=container.size();
+      if(!unionEnabled || unionInternalMap){
+        MapVector vector=container.addOrGet(name,MapVector.TYPE,MapVector.class);
+        writer=new SingleMapWriter(vector,this);
+      } else {
+        UnionVector vector = container.addOrGet(name, Types.optional(MinorType.UNION), UnionVector.class);
+        writer = new UnionWriter(vector);
+      }
       if(vectorCount != container.size()) {
         writer.allocate();
       }
@@ -108,8 +122,16 @@ public class ${mode}MapWriter extends AbstractFieldWriter {
   @Override
   public ListWriter list(String name) {
     FieldWriter writer = fields.get(name.toLowerCase());
+    int vectorCount = container.size();
     if(writer == null) {
-      writer = new SingleListWriter(name, container, this);
+      if (!unionEnabled){
+        writer = new SingleListWriter(name,container,this);
+      } else{
+        writer = new UnionWriter(container.addOrGet(name, Types.optional(MinorType.UNION), UnionVector.class));
+      }
+      if (container.size() > vectorCount) {
+        writer.allocate();
+      }
       writer.setPosition(${index});
       fields.put(name.toLowerCase(), writer);
     }
@@ -191,9 +213,17 @@ public class ${mode}MapWriter extends AbstractFieldWriter {
   </#if>
     FieldWriter writer = fields.get(name.toLowerCase());
     if(writer == null) {
-      final ${vectName}Vector vector = container.addOrGet(name, ${upperName}_TYPE, ${vectName}Vector.class);
+      ValueVector vector;
+      if (unionEnabled){
+        UnionVector v = container.addOrGet(name, Types.optional(MinorType.UNION), UnionVector.class);
+        writer = new UnionWriter(v);
+        vector = v;
+      } else {
+        ${vectName}Vector v = container.addOrGet(name, ${upperName}_TYPE, ${vectName}Vector.class);
+        writer = new ${vectName}WriterImpl(v, this);
+        vector = v;
+      }
       vector.allocateNewSafe();
-      writer = new ${vectName}WriterImpl(vector, this);
       writer.setPosition(${index});
       fields.put(name.toLowerCase(), writer);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/codegen/templates/NullReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/NullReader.java b/exec/java-exec/src/main/codegen/templates/NullReader.java
index a0e5f50..472dbed 100644
--- a/exec/java-exec/src/main/codegen/templates/NullReader.java
+++ b/exec/java-exec/src/main/codegen/templates/NullReader.java
@@ -56,11 +56,17 @@ public class NullReader extends AbstractBaseReader implements FieldReader{
 
   public void copyAsValue(ListWriter writer) {}
 
-  <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> 
+  public void copyAsValue(UnionWriter writer) {}
+
+  <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+  public void read(${name}Holder holder){
+    throw new UnsupportedOperationException("NullReader cannot read into non-nullable holder");
+  }
+
   public void read(Nullable${name}Holder holder){
     holder.isSet = 0;
   }
-  
+
   public void read(int arrayIndex, ${name}Holder holder){
     throw new ArrayIndexOutOfBoundsException();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/codegen/templates/RecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/RecordWriter.java b/exec/java-exec/src/main/codegen/templates/RecordWriter.java
index a37ffa8..24a94c4 100644
--- a/exec/java-exec/src/main/codegen/templates/RecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/RecordWriter.java
@@ -62,6 +62,7 @@ public interface RecordWriter {
 
   /** Add the field value given in <code>valueHolder</code> at the given column number <code>fieldId</code>. */
   public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldReader reader);
+  public FieldConverter getNewUnionConverter(int fieldId, String fieldName, FieldReader reader);
   public FieldConverter getNewRepeatedMapConverter(int fieldId, String fieldName, FieldReader reader);
   public FieldConverter getNewRepeatedListConverter(int fieldId, String fieldName, FieldReader reader);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/codegen/templates/TypeHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/TypeHelper.java b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
index 9c66cb7..d9810c7 100644
--- a/exec/java-exec/src/main/codegen/templates/TypeHelper.java
+++ b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
@@ -16,6 +16,8 @@
  * limitations under the License.
  */
 
+import org.apache.drill.exec.vector.complex.UnionVector;
+
 <@pp.dropOutputFile />
 <@pp.changeOutputFile name="/org/apache/drill/exec/expr/TypeHelper.java" />
 
@@ -71,6 +73,8 @@ public class TypeHelper {
   public static SqlAccessor getSqlAccessor(ValueVector vector){
     final MajorType type = vector.getField().getType();
     switch(type.getMinorType()){
+    case UNION:
+      return new UnionSqlAccessor((UnionVector) vector);
     <#list vv.types as type>
     <#list type.minor as minor>
     case ${minor.class?upper_case}:
@@ -100,8 +104,11 @@ public class TypeHelper {
   
   public static Class<?> getValueVectorClass(MinorType type, DataMode mode){
     switch (type) {
+    case UNION:
+      return UnionVector.class;
     case MAP:
       switch (mode) {
+      case OPTIONAL:
       case REQUIRED:
         return MapVector.class;
       case REPEATED:
@@ -112,6 +119,9 @@ public class TypeHelper {
       switch (mode) {
       case REPEATED:
         return RepeatedListVector.class;
+      case REQUIRED:
+      case OPTIONAL:
+        return ListVector.class;
       }
     
 <#list vv.types as type>
@@ -175,6 +185,7 @@ public class TypeHelper {
   
   public static Class<?> getWriterInterface( MinorType type, DataMode mode){
     switch (type) {
+    case UNION: return UnionWriter.class;
     case MAP: return MapWriter.class;
     case LIST: return ListWriter.class;
 <#list vv.types as type>
@@ -190,6 +201,8 @@ public class TypeHelper {
   
   public static Class<?> getWriterImpl( MinorType type, DataMode mode){
     switch (type) {
+    case UNION:
+      return UnionWriter.class;
     case MAP:
       switch (mode) {
       case REQUIRED:
@@ -247,6 +260,8 @@ public class TypeHelper {
   
   public static JType getHolderType(JCodeModel model, MinorType type, DataMode mode){
     switch (type) {
+    case UNION:
+      return model._ref(UnionHolder.class);
     case MAP:
     case LIST:
       return model._ref(ComplexHolder.class);
@@ -280,10 +295,13 @@ public class TypeHelper {
 
     switch (type.getMinorType()) {
     
-    
+    case UNION:
+      return new UnionVector(field, allocator, callBack);
+
     case MAP:
       switch (type.getMode()) {
       case REQUIRED:
+      case OPTIONAL:
         return new MapVector(field, allocator, callBack);
       case REPEATED:
         return new RepeatedMapVector(field, allocator, callBack);
@@ -292,7 +310,10 @@ public class TypeHelper {
       switch (type.getMode()) {
       case REPEATED:
         return new RepeatedListVector(field, allocator, callBack);
-      }    
+      case OPTIONAL:
+      case REQUIRED:
+        return new ListVector(field, allocator, callBack);
+      }
 <#list vv.  types as type>
   <#list type.minor as minor>
     case ${minor.class?upper_case}:

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/codegen/templates/UnionFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/UnionFunctions.java b/exec/java-exec/src/main/codegen/templates/UnionFunctions.java
new file mode 100644
index 0000000..41b6b00
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/templates/UnionFunctions.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+<@pp.dropOutputFile />
+<@pp.changeOutputFile name="/org/apache/drill/exec/vector/complex/impl/UnionFunctions.java" />
+
+
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.vector.complex.impl;
+
+<#include "/@includes/vv_imports.ftl" />
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.holders.*;
+import javax.inject.Inject;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.record.RecordBatch;
+
+/*
+ * This class is generated using freemarker and the ${.template_name} template.
+ */
+
+@SuppressWarnings("unused")
+public class UnionFunctions {
+
+  <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+  <#assign fields = minor.fields!type.fields />
+  <#assign uncappedName = name?uncap_first/>
+
+  <#if !minor.class?starts_with("Decimal")>
+
+  @SuppressWarnings("unused")
+  @FunctionTemplate(name = "as${name}", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.INTERNAL)
+  public static class CastUnion${name} implements DrillSimpleFunc {
+
+    @Param UnionHolder in;
+    @Output Nullable${name}Holder out;
+
+    public void setup() {}
+
+    public void eval() {
+      if (in.isSet == 1) {
+        in.reader.read(out);
+      } else {
+        out.isSet = 0;
+      }
+    }
+  }
+
+  @SuppressWarnings("unused")
+  @FunctionTemplate(names = {"castUNION", "castToUnion"}, scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.INTERNAL)
+  public static class Cast${name}ToUnion implements DrillSimpleFunc {
+
+    @Param Nullable${name}Holder in;
+    @Output UnionHolder out;
+
+    public void setup() {}
+
+    public void eval() {
+      out.reader = new org.apache.drill.exec.vector.complex.impl.Nullable${name}HolderReaderImpl(in);
+      out.isSet = in.isSet;
+    }
+  }
+
+  </#if>
+
+  </#list></#list>
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/codegen/templates/UnionListWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/UnionListWriter.java b/exec/java-exec/src/main/codegen/templates/UnionListWriter.java
new file mode 100644
index 0000000..fd7256b
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/templates/UnionListWriter.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+<@pp.dropOutputFile />
+<@pp.changeOutputFile name="/org/apache/drill/exec/vector/complex/impl/UnionListWriter.java" />
+
+
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.vector.complex.impl;
+
+<#include "/@includes/vv_imports.ftl" />
+
+/*
+ * This class is generated using freemarker and the ${.template_name} template.
+ */
+
+@SuppressWarnings("unused")
+public class UnionListWriter extends AbstractFieldWriter {
+
+  ListVector vector;
+  UnionVector data;
+  UInt4Vector offsets;
+  private UnionWriter writer;
+  private boolean inMap = false;
+  private String mapName;
+  private int lastIndex = 0;
+
+  public UnionListWriter(ListVector vector) {
+    super(null);
+    this.vector = vector;
+    this.data = (UnionVector) vector.getDataVector();
+    this.writer = new UnionWriter(data);
+    this.offsets = vector.getOffsetVector();
+  }
+
+  @Override
+  public void allocate() {
+    vector.allocateNew();
+  }
+
+  @Override
+  public void clear() {
+    vector.clear();
+  }
+
+  @Override
+  public MaterializedField getField() {
+    return null;
+  }
+
+  @Override
+  public int getValueCapacity() {
+    return vector.getValueCapacity();
+  }
+
+  @Override
+  public void close() throws Exception {
+
+  }
+
+  <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+  <#assign fields = minor.fields!type.fields />
+  <#assign uncappedName = name?uncap_first/>
+
+  <#if !minor.class?starts_with("Decimal")>
+
+  @Override
+  public ${name}Writer <#if uncappedName == "int">integer<#else>${uncappedName}</#if>() {
+    return this;
+  }
+
+  @Override
+  public ${name}Writer <#if uncappedName == "int">integer<#else>${uncappedName}</#if>(String name) {
+    assert inMap;
+    mapName = name;
+    return this;
+  }
+
+  </#if>
+
+  </#list></#list>
+
+  @Override
+  public MapWriter map() {
+    inMap = true;
+    return this;
+  }
+
+  @Override
+  public ListWriter list() {
+    final int nextOffset = offsets.getAccessor().get(idx() + 1);
+    vector.getMutator().setNotNull(idx());
+    offsets.getMutator().setSafe(idx() + 1, nextOffset + 1);
+    writer.setPosition(nextOffset);
+    return writer;
+  }
+
+  @Override
+  public ListWriter list(String name) {
+    final int nextOffset = offsets.getAccessor().get(idx() + 1);
+    vector.getMutator().setNotNull(idx());
+    data.getMutator().setType(nextOffset, MinorType.MAP);
+    writer.setPosition(nextOffset);
+    ListWriter listWriter = writer.list(name);
+    return listWriter;
+  }
+
+  @Override
+  public MapWriter map(String name) {
+    MapWriter mapWriter = writer.map(name);
+    return mapWriter;
+  }
+
+  @Override
+  public void startList() {
+    vector.getMutator().startNewValue(idx());
+  }
+
+  @Override
+  public void endList() {
+
+  }
+
+  @Override
+  public void start() {
+    assert inMap;
+    final int nextOffset = offsets.getAccessor().get(idx() + 1);
+    vector.getMutator().setNotNull(idx());
+    data.getMutator().setType(nextOffset, MinorType.MAP);
+    offsets.getMutator().setSafe(idx() + 1, nextOffset);
+    writer.setPosition(nextOffset);
+  }
+
+  @Override
+  public void end() {
+    if (inMap) {
+      inMap = false;
+      final int nextOffset = offsets.getAccessor().get(idx() + 1);
+      offsets.getMutator().setSafe(idx() + 1, nextOffset + 1);
+    }
+  }
+
+  <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+  <#assign fields = minor.fields!type.fields />
+  <#assign uncappedName = name?uncap_first/>
+
+  <#if !minor.class?starts_with("Decimal")>
+
+  @Override
+  public void write${name}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>) {
+    if (inMap) {
+      final int nextOffset = offsets.getAccessor().get(idx() + 1);
+      vector.getMutator().setNotNull(idx());
+      data.getMutator().setType(nextOffset, MinorType.MAP);
+      writer.setPosition(nextOffset);
+      ${name}Writer ${uncappedName}Writer = writer.<#if uncappedName == "int">integer<#else>${uncappedName}</#if>(mapName);
+      ${uncappedName}Writer.write${name}(<#list fields as field>${field.name}<#if field_has_next>, </#if></#list>);
+    } else {
+      final int nextOffset = offsets.getAccessor().get(idx() + 1);
+      vector.getMutator().setNotNull(idx());
+      writer.setPosition(nextOffset);
+      writer.write${name}(<#list fields as field>${field.name}<#if field_has_next>, </#if></#list>);
+      offsets.getMutator().setSafe(idx() + 1, nextOffset + 1);
+    }
+  }
+
+  </#if>
+
+  </#list></#list>
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/codegen/templates/UnionReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/UnionReader.java b/exec/java-exec/src/main/codegen/templates/UnionReader.java
new file mode 100644
index 0000000..38d4247
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/templates/UnionReader.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.vector.complex.impl.NullReader;
+
+<@pp.dropOutputFile />
+<@pp.changeOutputFile name="/org/apache/drill/exec/vector/complex/impl/UnionReader.java" />
+
+
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.vector.complex.impl;
+
+<#include "/@includes/vv_imports.ftl" />
+
+@SuppressWarnings("unused")
+public class UnionReader extends AbstractFieldReader {
+
+  private BaseReader[] readers = new BaseReader[43];
+  public UnionVector data;
+  
+  public UnionReader(UnionVector data) {
+    this.data = data;
+  }
+
+  public MajorType getType() {
+    return Types.required(MinorType.valueOf(data.getTypeValue(idx())));
+  }
+
+  public boolean isSet(){
+    return !data.getAccessor().isNull(idx());
+  }
+
+  public void read(UnionHolder holder) {
+    holder.reader = this;
+    holder.isSet = this.isSet() ? 1 : 0;
+  }
+
+  public void read(int index, UnionHolder holder) {
+    getList().read(index, holder);
+  }
+
+  private FieldReader getReaderForIndex(int index) {
+    int typeValue = data.getTypeValue(index);
+    FieldReader reader = (FieldReader) readers[typeValue];
+    if (reader != null) {
+      return reader;
+    }
+    switch (typeValue) {
+    case 0:
+      return NullReader.INSTANCE;
+    case MinorType.MAP_VALUE:
+      return (FieldReader) getMap();
+    case MinorType.LIST_VALUE:
+      return (FieldReader) getList();
+    <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+    <#assign uncappedName = name?uncap_first/>
+    <#if !minor.class?starts_with("Decimal")>
+    case MinorType.${name?upper_case}_VALUE:
+      return (FieldReader) get${name}();
+    </#if>
+    </#list></#list>
+    default:
+      throw new UnsupportedOperationException("Unsupported type: " + MinorType.valueOf(typeValue));
+    }
+  }
+
+  private SingleMapReaderImpl mapReader;
+
+  private MapReader getMap() {
+    if (mapReader == null) {
+      mapReader = (SingleMapReaderImpl) data.getMap().getReader();
+      mapReader.setPosition(idx());
+      readers[MinorType.MAP_VALUE] = mapReader;
+    }
+    return mapReader;
+  }
+
+  private UnionListReader listReader;
+
+  private FieldReader getList() {
+    if (listReader == null) {
+      listReader = new UnionListReader(data.getList());
+      listReader.setPosition(idx());
+      readers[MinorType.LIST_VALUE] = listReader;
+    }
+    return listReader;
+  }
+
+  @Override
+  public java.util.Iterator<String> iterator() {
+    return getMap().iterator();
+  }
+
+  @Override
+  public void copyAsValue(UnionWriter writer) {
+    writer.data.copyFrom(idx(), writer.idx(), data);
+  }
+
+  <#list ["Object", "BigDecimal", "Integer", "Long", "Boolean",
+          "Character", "DateTime", "Period", "Double", "Float",
+          "Text", "String", "Byte", "Short", "byte[]"] as friendlyType>
+  <#assign safeType=friendlyType />
+  <#if safeType=="byte[]"><#assign safeType="ByteArray" /></#if>
+
+  @Override
+  public ${friendlyType} read${safeType}() {
+    return getReaderForIndex(idx()).read${safeType}();
+  }
+
+  </#list>
+
+  <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+          <#assign uncappedName = name?uncap_first/>
+  <#assign boxedType = (minor.boxedType!type.boxedType) />
+  <#assign javaType = (minor.javaType!type.javaType) />
+  <#assign friendlyType = (minor.friendlyType!minor.boxedType!type.boxedType) />
+  <#assign safeType=friendlyType />
+  <#if safeType=="byte[]"><#assign safeType="ByteArray" /></#if>
+  <#if !minor.class?starts_with("Decimal")>
+
+  private Nullable${name}ReaderImpl ${uncappedName}Reader;
+
+  private Nullable${name}ReaderImpl get${name}() {
+    if (${uncappedName}Reader == null) {
+      ${uncappedName}Reader = new Nullable${name}ReaderImpl(data.get${name}Vector());
+      ${uncappedName}Reader.setPosition(idx());
+      readers[MinorType.${name?upper_case}_VALUE] = ${uncappedName}Reader;
+    }
+    return ${uncappedName}Reader;
+  }
+
+  public void read(Nullable${name}Holder holder){
+    getReaderForIndex(idx()).read(holder);
+  }
+
+  public void copyAsValue(${name}Writer writer){
+    getReaderForIndex(idx()).copyAsValue(writer);
+  }
+  </#if>
+  </#list></#list>
+
+  @Override
+  public void setPosition(int index) {
+    super.setPosition(index);
+    for (BaseReader reader : readers) {
+      if (reader != null) {
+        reader.setPosition(index);
+      }
+    }
+  }
+  
+  public FieldReader reader(String name){
+    return getMap().reader(name);
+  }
+
+  public FieldReader reader() {
+    return getList().reader();
+  }
+
+  public boolean next() {
+    return getReaderForIndex(idx()).next();
+  }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/codegen/templates/UnionVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/UnionVector.java b/exec/java-exec/src/main/codegen/templates/UnionVector.java
new file mode 100644
index 0000000..6a72757
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/templates/UnionVector.java
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+
+<@pp.dropOutputFile />
+<@pp.changeOutputFile name="/org/apache/drill/exec/vector/complex/impl/UnionVector.java" />
+
+
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.vector.complex.impl;
+
+<#include "/@includes/vv_imports.ftl" />
+import java.util.Iterator;
+import org.apache.drill.exec.vector.complex.impl.ComplexCopier;
+import org.apache.drill.exec.util.CallBack;
+
+/*
+ * This class is generated using freemarker and the ${.template_name} template.
+ */
+@SuppressWarnings("unused")
+
+
+public class UnionVector implements ValueVector {
+
+  private MaterializedField field;
+  private BufferAllocator allocator;
+  private Accessor accessor = new Accessor();
+  private Mutator mutator = new Mutator();
+  private int valueCount;
+
+  private MapVector internalMap;
+  private SingleMapWriter internalMapWriter;
+  private UInt1Vector typeVector;
+
+  private MapVector mapVector;
+  private ListVector listVector;
+  private NullableBigIntVector bigInt;
+  private NullableVarCharVector varChar;
+
+  private FieldReader reader;
+  private NullableBitVector bit;
+
+  private State state = State.INIT;
+  private int singleType = 0;
+  private ValueVector singleVector;
+
+  private enum State {
+    INIT, SINGLE, MULTI
+  }
+
+  public UnionVector(MaterializedField field, BufferAllocator allocator, CallBack callBack) {
+    this.field = field.clone();
+    this.allocator = allocator;
+    internalMap = new MapVector("internal", allocator, callBack);
+    internalMapWriter = new SingleMapWriter(internalMap, null, true, true);
+    this.typeVector = internalMap.addOrGet("types", Types.required(MinorType.UINT1), UInt1Vector.class);
+    this.field.addChild(internalMap.getField().clone());
+  }
+
+  private void updateState(ValueVector v) {
+    if (state == State.INIT) {
+      state = State.SINGLE;
+      singleVector = v;
+      singleType = v.getField().getType().getMinorType().getNumber();
+    } else {
+      state = State.MULTI;
+      singleVector = null;
+    }
+  }
+
+  public boolean isSingleType() {
+    return state == State.SINGLE && singleType != MinorType.LIST_VALUE;
+  }
+
+  public ValueVector getSingleVector() {
+    assert state != State.MULTI : "Cannot get single vector when there are multiple types";
+    assert state != State.INIT : "Cannot get single vector when there are no types";
+    return singleVector;
+  }
+
+  public MapVector getMap() {
+    if (mapVector == null) {
+      int vectorCount = internalMap.size();
+      mapVector = internalMap.addOrGet("map", Types.optional(MinorType.MAP), MapVector.class);
+      updateState(mapVector);
+      if (internalMap.size() > vectorCount) {
+        mapVector.allocateNew();
+      }
+    }
+    return mapVector;
+  }
+
+  <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+  <#assign fields = minor.fields!type.fields />
+  <#assign uncappedName = name?uncap_first/>
+  <#if !minor.class?starts_with("Decimal")>
+
+  private Nullable${name}Vector ${uncappedName}Vector;
+
+  public Nullable${name}Vector get${name}Vector() {
+    if (${uncappedName}Vector == null) {
+      int vectorCount = internalMap.size();
+      ${uncappedName}Vector = internalMap.addOrGet("${uncappedName}", Types.optional(MinorType.${name?upper_case}), Nullable${name}Vector.class);
+      updateState(${uncappedName}Vector);
+      if (internalMap.size() > vectorCount) {
+        ${uncappedName}Vector.allocateNew();
+      }
+    }
+    return ${uncappedName}Vector;
+  }
+
+  </#if>
+
+  </#list></#list>
+
+  public ListVector getList() {
+    if (listVector == null) {
+      int vectorCount = internalMap.size();
+      listVector = internalMap.addOrGet("list", Types.optional(MinorType.LIST), ListVector.class);
+      updateState(listVector);
+      if (internalMap.size() > vectorCount) {
+        listVector.allocateNew();
+      }
+    }
+    return listVector;
+  }
+
+  public int getTypeValue(int index) {
+    return typeVector.getAccessor().get(index);
+  }
+
+  public UInt1Vector getTypeVector() {
+    return typeVector;
+  }
+
+  @Override
+  public void allocateNew() throws OutOfMemoryRuntimeException {
+    internalMap.allocateNew();
+    if (typeVector != null) {
+      typeVector.zeroVector();
+    }
+  }
+
+  @Override
+  public boolean allocateNewSafe() {
+    boolean safe = internalMap.allocateNewSafe();
+    if (safe) {
+      if (typeVector != null) {
+        typeVector.zeroVector();
+      }
+    }
+    return safe;
+  }
+
+  @Override
+  public void setInitialCapacity(int numRecords) {
+  }
+
+  @Override
+  public int getValueCapacity() {
+    return Math.min(typeVector.getValueCapacity(), internalMap.getValueCapacity());
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public void clear() {
+    internalMap.clear();
+  }
+
+  @Override
+  public MaterializedField getField() {
+    return field;
+  }
+
+  @Override
+  public TransferPair getTransferPair() {
+    return new TransferImpl(field);
+  }
+
+  @Override
+  public TransferPair getTransferPair(FieldReference ref) {
+    return new TransferImpl(field.withPath(ref));
+  }
+
+  @Override
+  public TransferPair makeTransferPair(ValueVector target) {
+    return new TransferImpl((UnionVector) target);
+  }
+
+  public void transferTo(UnionVector target) {
+    internalMap.makeTransferPair(target.internalMap).transfer();
+    target.valueCount = valueCount;
+  }
+
+  public void copyFrom(int inIndex, int outIndex, UnionVector from) {
+    from.getReader().setPosition(inIndex);
+    getWriter().setPosition(outIndex);
+    ComplexCopier copier = new ComplexCopier(from.reader, mutator.writer);
+    copier.write();
+  }
+
+  public void copyFromSafe(int inIndex, int outIndex, UnionVector from) {
+    copyFrom(inIndex, outIndex, from);
+  }
+
+  private class TransferImpl implements TransferPair {
+
+    UnionVector to;
+
+    public TransferImpl(MaterializedField field) {
+      to = new UnionVector(field, allocator, null);
+    }
+
+    public TransferImpl(UnionVector to) {
+      this.to = to;
+    }
+
+    @Override
+    public void transfer() {
+      transferTo(to);
+    }
+
+    @Override
+    public void splitAndTransfer(int startIndex, int length) {
+
+    }
+
+    @Override
+    public ValueVector getTo() {
+      return to;
+    }
+
+    @Override
+    public void copyValueSafe(int from, int to) {
+      this.to.copyFrom(from, to, UnionVector.this);
+    }
+  }
+
+  @Override
+  public Accessor getAccessor() {
+    return accessor;
+  }
+
+  @Override
+  public Mutator getMutator() {
+    return mutator;
+  }
+
+  @Override
+  public FieldReader getReader() {
+    if (reader == null) {
+      reader = new UnionReader(this);
+    }
+    return reader;
+  }
+
+  public FieldWriter getWriter() {
+    if (mutator.writer == null) {
+      mutator.writer = new UnionWriter(this);
+    }
+    return mutator.writer;
+  }
+
+  @Override
+  public UserBitShared.SerializedField getMetadata() {
+    SerializedField.Builder b = getField() //
+            .getAsBuilder() //
+            .setBufferLength(getBufferSize()) //
+            .setValueCount(valueCount);
+
+    b.addChild(internalMap.getMetadata());
+    return b.build();
+  }
+
+  @Override
+  public int getBufferSize() {
+    return internalMap.getBufferSize();
+  }
+
+  @Override
+  public DrillBuf[] getBuffers(boolean clear) {
+    return internalMap.getBuffers(clear);
+  }
+
+  @Override
+  public void load(UserBitShared.SerializedField metadata, DrillBuf buffer) {
+    valueCount = metadata.getValueCount();
+
+    internalMap.load(metadata.getChild(0), buffer);
+  }
+
+  @Override
+  public Iterator<ValueVector> iterator() {
+    return null;
+  }
+
+  public class Accessor extends BaseValueVector.BaseAccessor {
+
+
+    @Override
+    public Object getObject(int index) {
+      int type = typeVector.getAccessor().get(index);
+      switch (type) {
+      case 0:
+        return null;
+      <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+      <#assign fields = minor.fields!type.fields />
+      <#assign uncappedName = name?uncap_first/>
+      <#if !minor.class?starts_with("Decimal")>
+      case MinorType.${name?upper_case}_VALUE:
+        return get${name}Vector().getAccessor().getObject(index);
+      </#if>
+
+      </#list></#list>
+      case MinorType.MAP_VALUE:
+        return getMap().getAccessor().getObject(index);
+      case MinorType.LIST_VALUE:
+        return getList().getAccessor().getObject(index);
+      default:
+        throw new UnsupportedOperationException("Cannot support type: " + MinorType.valueOf(type));
+      }
+    }
+
+    public byte[] get(int index) {
+      return null;
+    }
+
+    public void get(int index, ComplexHolder holder) {
+    }
+
+    public void get(int index, UnionHolder holder) {
+      if (reader == null) {
+        reader = new UnionReader(UnionVector.this);
+      }
+      reader.setPosition(index);
+      holder.reader = reader;
+    }
+
+    @Override
+    public int getValueCount() {
+      return valueCount;
+    }
+
+    @Override
+    public boolean isNull(int index) {
+      return typeVector.getAccessor().get(index) == 0;
+    }
+
+    public int isSet(int index) {
+      return isNull(index) ? 0 : 1;
+    }
+  }
+
+  public class Mutator extends BaseValueVector.BaseMutator {
+
+    UnionWriter writer;
+
+    @Override
+    public void setValueCount(int valueCount) {
+      UnionVector.this.valueCount = valueCount;
+      internalMap.getMutator().setValueCount(valueCount);
+    }
+
+    public void set(int index, byte[] bytes) {
+    }
+
+    public void setSafe(int index, UnionHolder holder) {
+      FieldReader reader = holder.reader;
+      if (writer == null) {
+        writer = new UnionWriter(UnionVector.this);
+      }
+      writer.setPosition(index);
+      MinorType type = reader.getType().getMinorType();
+      switch (type) {
+      <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+      <#assign fields = minor.fields!type.fields />
+      <#assign uncappedName = name?uncap_first/>
+      <#if !minor.class?starts_with("Decimal")>
+      case ${name?upper_case}:
+        Nullable${name}Holder ${uncappedName}Holder = new Nullable${name}Holder();
+        reader.read(${uncappedName}Holder);
+        if (holder.isSet == 1) {
+          writer.write${name}(<#list fields as field>${uncappedName}Holder.${field.name}<#if field_has_next>, </#if></#list>);
+        }
+        break;
+      </#if>
+      </#list></#list>
+      case MAP: {
+        ComplexCopier copier = new ComplexCopier(reader, writer);
+        copier.write();
+        break;
+      }
+      case LIST: {
+        ComplexCopier copier = new ComplexCopier(reader, writer);
+        copier.write();
+        break;
+      }
+      default:
+        throw new UnsupportedOperationException();
+      }
+    }
+
+    public void setType(int index, MinorType type) {
+      typeVector.getMutator().setSafe(index, type.getNumber());
+    }
+
+    @Override
+    public void reset() { }
+
+    @Override
+    public void generateTestData(int values) { }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/codegen/templates/UnionWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/UnionWriter.java b/exec/java-exec/src/main/codegen/templates/UnionWriter.java
new file mode 100644
index 0000000..9d77999
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/templates/UnionWriter.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+<@pp.dropOutputFile />
+<@pp.changeOutputFile name="/org/apache/drill/exec/vector/complex/impl/UnionWriter.java" />
+
+
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.vector.complex.impl;
+
+<#include "/@includes/vv_imports.ftl" />
+
+/*
+ * This class is generated using freemarker and the ${.template_name} template.
+ */
+@SuppressWarnings("unused")
+public class UnionWriter extends AbstractFieldWriter implements FieldWriter {
+
+  UnionVector data;
+  private MapWriter mapWriter;
+  private UnionListWriter listWriter;
+  private List<BaseWriter> writers = Lists.newArrayList();
+
+  public UnionWriter(BufferAllocator allocator) {
+    super(null);
+  }
+
+  public UnionWriter(UnionVector vector) {
+    super(null);
+    data = vector;
+  }
+
+  public UnionWriter(UnionVector vector, FieldWriter parent) {
+    super(null);
+    data = vector;
+  }
+
+  @Override
+  public void setPosition(int index) {
+    super.setPosition(index);
+    for (BaseWriter writer : writers) {
+      writer.setPosition(index);
+    }
+  }
+
+
+  @Override
+  public void start() {
+    data.getMutator().setType(idx(), MinorType.MAP);
+    getMapWriter(true).start();
+  }
+
+  @Override
+  public void end() {
+    getMapWriter(false).end();
+  }
+
+  @Override
+  public void startList() {
+    getListWriter(true).startList();
+    data.getMutator().setType(idx(), MinorType.LIST);
+  }
+
+  @Override
+  public void endList() {
+    getListWriter(true).endList();
+  }
+
+  private MapWriter getMapWriter(boolean create) {
+    if (create && mapWriter == null) {
+      mapWriter = new SingleMapWriter(data.getMap(), null, true, false);
+      mapWriter.setPosition(idx());
+      writers.add(mapWriter);
+    }
+    return mapWriter;
+  }
+
+  public MapWriter asMap() {
+    data.getMutator().setType(idx(), MinorType.MAP);
+    return getMapWriter(true);
+  }
+
+  private ListWriter getListWriter(boolean create) {
+    if (create && listWriter == null) {
+      listWriter = new UnionListWriter(data.getList());
+      listWriter.setPosition(idx());
+      writers.add(listWriter);
+    }
+    return listWriter;
+  }
+
+  public ListWriter asList() {
+    data.getMutator().setType(idx(), MinorType.LIST);
+    return getListWriter(true);
+  }
+
+  <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
+  <#assign fields = minor.fields!type.fields />
+  <#assign uncappedName = name?uncap_first/>
+
+          <#if !minor.class?starts_with("Decimal")>
+
+  private ${name}Writer ${name?uncap_first}Writer;
+
+  private ${name}Writer get${name}Writer(boolean create) {
+    if (create && ${uncappedName}Writer == null) {
+      ${uncappedName}Writer = new Nullable${name}WriterImpl(data.get${name}Vector(), null);
+      ${uncappedName}Writer.setPosition(idx());
+      writers.add(${uncappedName}Writer);
+    }
+    return ${uncappedName}Writer;
+  }
+
+  public ${name}Writer as${name}() {
+    data.getMutator().setType(idx(), MinorType.${name?upper_case});
+    return get${name}Writer(true);
+  }
+
+  @Override
+  public void write(${name}Holder holder) {
+    data.getMutator().setType(idx(), MinorType.${name?upper_case});
+    get${name}Writer(true).setPosition(idx());
+    get${name}Writer(true).write${name}(<#list fields as field>holder.${field.name}<#if field_has_next>, </#if></#list>);
+  }
+
+  public void write${minor.class}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>) {
+    data.getMutator().setType(idx(), MinorType.${name?upper_case});
+    get${name}Writer(true).setPosition(idx());
+    get${name}Writer(true).write${name}(<#list fields as field>${field.name}<#if field_has_next>, </#if></#list>);
+  }
+  </#if>
+
+  </#list></#list>
+
+  public void writeNull() {
+  }
+
+  @Override
+  public MapWriter map() {
+    data.getMutator().setType(idx(), MinorType.LIST);
+    getListWriter(true).setPosition(idx());
+    return getListWriter(true).map();
+  }
+
+  @Override
+  public ListWriter list() {
+    data.getMutator().setType(idx(), MinorType.LIST);
+    getListWriter(true).setPosition(idx());
+    return getListWriter(true).list();
+  }
+
+  @Override
+  public ListWriter list(String name) {
+    data.getMutator().setType(idx(), MinorType.MAP);
+    getMapWriter(true).setPosition(idx());
+    return getMapWriter(true).list(name);
+  }
+
+  @Override
+  public MapWriter map(String name) {
+    data.getMutator().setType(idx(), MinorType.MAP);
+    getMapWriter(true).setPosition(idx());
+    return getMapWriter(true).map(name);
+  }
+
+  <#list vv.types as type><#list type.minor as minor>
+  <#assign lowerName = minor.class?uncap_first />
+  <#if lowerName == "int" ><#assign lowerName = "integer" /></#if>
+  <#assign upperName = minor.class?upper_case />
+  <#assign capName = minor.class?cap_first />
+  <#if !minor.class?starts_with("Decimal")>
+  @Override
+  public ${capName}Writer ${lowerName}(String name) {
+    data.getMutator().setType(idx(), MinorType.MAP);
+    getMapWriter(true).setPosition(idx());
+    return getMapWriter(true).${lowerName}(name);
+  }
+
+  @Override
+  public ${capName}Writer ${lowerName}() {
+    data.getMutator().setType(idx(), MinorType.LIST);
+    getListWriter(true).setPosition(idx());
+    return getListWriter(true).${lowerName}();
+  }
+  </#if>
+  </#list></#list>
+
+  @Override
+  public void allocate() {
+    data.allocateNew();
+  }
+
+  @Override
+  public void clear() {
+    data.clear();
+  }
+
+  @Override
+  public void close() throws Exception {
+    data.close();
+  }
+
+  @Override
+  public MaterializedField getField() {
+    return data.getField();
+  }
+
+  @Override
+  public int getValueCapacity() {
+    return data.getValueCapacity();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index db7f68d..f85a776 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -160,6 +160,8 @@ public interface ExecConstants {
   public static String MONGO_READER_READ_NUMBERS_AS_DOUBLE = "store.mongo.read_numbers_as_double";
   public static OptionValidator MONGO_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR = new BooleanValidator(MONGO_READER_READ_NUMBERS_AS_DOUBLE, false);
 
+  public static BooleanValidator ENABLE_UNION_TYPE = new BooleanValidator("exec.enable_union_type", false);
+
   // TODO: We need to add a feature that enables storage plugins to add their own options. Currently we have to declare
   // in core which is not right. Move this option and above two mongo plugin related options once we have the feature.
   public static String HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS = "store.hive.optimize_scan_with_native_readers";

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
index df315b2..bc6d807 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
@@ -87,12 +87,12 @@ public class ExpressionTreeMaterializer {
   };
 
   public static LogicalExpression materialize(LogicalExpression expr, VectorAccessible batch, ErrorCollector errorCollector, FunctionLookupContext functionLookupContext) {
-    return ExpressionTreeMaterializer.materialize(expr, batch, errorCollector, functionLookupContext, false);
+    return ExpressionTreeMaterializer.materialize(expr, batch, errorCollector, functionLookupContext, false, false);
   }
 
   public static LogicalExpression materializeAndCheckErrors(LogicalExpression expr, VectorAccessible batch, FunctionLookupContext functionLookupContext) throws SchemaChangeException {
     ErrorCollector collector = new ErrorCollectorImpl();
-    LogicalExpression e = ExpressionTreeMaterializer.materialize(expr, batch, collector, functionLookupContext, false);
+    LogicalExpression e = ExpressionTreeMaterializer.materialize(expr, batch, collector, functionLookupContext, false, false);
     if (collector.hasErrors()) {
       throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
     }
@@ -100,8 +100,8 @@ public class ExpressionTreeMaterializer {
   }
 
   public static LogicalExpression materialize(LogicalExpression expr, VectorAccessible batch, ErrorCollector errorCollector, FunctionLookupContext functionLookupContext,
-      boolean allowComplexWriterExpr) {
-    LogicalExpression out =  expr.accept(new MaterializeVisitor(batch, errorCollector, allowComplexWriterExpr), functionLookupContext);
+      boolean allowComplexWriterExpr, boolean unionTypeEnabled) {
+    LogicalExpression out =  expr.accept(new MaterializeVisitor(batch, errorCollector, allowComplexWriterExpr, unionTypeEnabled), functionLookupContext);
 
     if (!errorCollector.hasErrors()) {
       out = out.accept(ConditionalExprOptimizer.INSTANCE, null);
@@ -190,11 +190,13 @@ public class ExpressionTreeMaterializer {
     private final ErrorCollector errorCollector;
     private final VectorAccessible batch;
     private final boolean allowComplexWriter;
+    private final boolean unionTypeEnabled;
 
-    public MaterializeVisitor(VectorAccessible batch, ErrorCollector errorCollector, boolean allowComplexWriter) {
+    public MaterializeVisitor(VectorAccessible batch, ErrorCollector errorCollector, boolean allowComplexWriter, boolean unionTypeEnabled) {
       this.batch = batch;
       this.errorCollector = errorCollector;
       this.allowComplexWriter = allowComplexWriter;
+      this.unionTypeEnabled = unionTypeEnabled;
     }
 
     private LogicalExpression validateNewExpr(LogicalExpression newExpr) {
@@ -323,23 +325,42 @@ public class ExpressionTreeMaterializer {
 
       MinorType thenType = conditions.expression.getMajorType().getMinorType();
       MinorType elseType = newElseExpr.getMajorType().getMinorType();
+      if (unionTypeEnabled) {
+        if (thenType != elseType && !(thenType == MinorType.NULL || elseType == MinorType.NULL)) {
+
+          MinorType leastRestrictive = TypeCastRules.getLeastRestrictiveType((Arrays.asList(thenType, elseType)));
+          if (leastRestrictive != thenType) {
+            // Implicitly cast the then expression
+            conditions = new IfExpression.IfCondition(newCondition,
+                    addCastExpression(conditions.expression, Types.optional(MinorType.UNION), functionLookupContext, errorCollector));
+          } else if (leastRestrictive != elseType) {
+            // Implicitly cast the else expression
+            newElseExpr = addCastExpression(newElseExpr, Types.optional(MinorType.UNION), functionLookupContext, errorCollector);
+          } else {
+            conditions = new IfExpression.IfCondition(newCondition,
+                    addCastExpression(conditions.expression, Types.optional(MinorType.UNION), functionLookupContext, errorCollector));
+            newElseExpr = addCastExpression(newElseExpr, Types.optional(MinorType.UNION), functionLookupContext, errorCollector);
+          }
+        }
 
-      // Check if we need a cast
-      if (thenType != elseType && !(thenType == MinorType.NULL || elseType == MinorType.NULL)) {
-
-        MinorType leastRestrictive = TypeCastRules.getLeastRestrictiveType((Arrays.asList(thenType, elseType)));
-        if (leastRestrictive != thenType) {
-          // Implicitly cast the then expression
-          conditions = new IfExpression.IfCondition(newCondition,
-          addCastExpression(conditions.expression, newElseExpr.getMajorType(), functionLookupContext, errorCollector));
-        } else if (leastRestrictive != elseType) {
-          // Implicitly cast the else expression
-          newElseExpr = addCastExpression(newElseExpr, conditions.expression.getMajorType(), functionLookupContext, errorCollector);
-        } else {
-          /* Cannot cast one of the two expressions to make the output type of if and else expression
-           * to be the same. Raise error.
-           */
-          throw new DrillRuntimeException("Case expression should have similar output type on all its branches");
+      } else {
+        // Check if we need a cast
+        if (thenType != elseType && !(thenType == MinorType.NULL || elseType == MinorType.NULL)) {
+
+          MinorType leastRestrictive = TypeCastRules.getLeastRestrictiveType((Arrays.asList(thenType, elseType)));
+          if (leastRestrictive != thenType) {
+            // Implicitly cast the then expression
+            conditions = new IfExpression.IfCondition(newCondition,
+            addCastExpression(conditions.expression, newElseExpr.getMajorType(), functionLookupContext, errorCollector));
+          } else if (leastRestrictive != elseType) {
+            // Implicitly cast the else expression
+            newElseExpr = addCastExpression(newElseExpr, conditions.expression.getMajorType(), functionLookupContext, errorCollector);
+          } else {
+            /* Cannot cast one of the two expressions to make the output type of if and else expression
+             * to be the same. Raise error.
+             */
+            throw new DrillRuntimeException("Case expression should have similar output type on all its branches");
+          }
         }
       }
 
@@ -374,19 +395,21 @@ public class ExpressionTreeMaterializer {
         }
       }
 
-      // If the type of the IF expression is nullable, apply a convertToNullable*Holder function for "THEN"/"ELSE"
-      // expressions whose type is not nullable.
-      if (IfExpression.newBuilder().setElse(newElseExpr).setIfCondition(conditions).build().getMajorType().getMode()
-          == DataMode.OPTIONAL) {
+      if (!unionTypeEnabled) {
+        // If the type of the IF expression is nullable, apply a convertToNullable*Holder function for "THEN"/"ELSE"
+        // expressions whose type is not nullable.
+        if (IfExpression.newBuilder().setElse(newElseExpr).setIfCondition(conditions).build().getMajorType().getMode()
+                == DataMode.OPTIONAL) {
           IfExpression.IfCondition condition = conditions;
           if (condition.expression.getMajorType().getMode() != DataMode.OPTIONAL) {
             conditions = new IfExpression.IfCondition(condition.condition, getConvertToNullableExpr(ImmutableList.of(condition.expression),
-                                                      condition.expression.getMajorType().getMinorType(), functionLookupContext));
-         }
+                    condition.expression.getMajorType().getMinorType(), functionLookupContext));
+          }
 
-        if (newElseExpr.getMajorType().getMode() != DataMode.OPTIONAL) {
-          newElseExpr = getConvertToNullableExpr(ImmutableList.of(newElseExpr),
-              newElseExpr.getMajorType().getMinorType(), functionLookupContext);
+          if (newElseExpr.getMajorType().getMode() != DataMode.OPTIONAL) {
+            newElseExpr = getConvertToNullableExpr(ImmutableList.of(newElseExpr),
+                    newElseExpr.getMajorType().getMinorType(), functionLookupContext);
+          }
         }
       }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/expr/GetSetVectorHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/GetSetVectorHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/GetSetVectorHelper.java
index a10c207..547f0f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/GetSetVectorHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/GetSetVectorHelper.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.expr;
 import io.netty.buffer.DrillBuf;
 
 import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
 
 import com.sun.codemodel.JBlock;
@@ -114,6 +115,9 @@ public class GetSetVectorHelper {
 
     JInvocation setMethod = vector.invoke("getMutator").invoke(setMethodName).arg(indexVariable);
 
+    if (type.getMinorType() == MinorType.UNION) {
+      return setMethod.arg(in.getHolder());
+    }
     switch(type.getMode()){
     case OPTIONAL:
       setMethod = setMethod.arg(in.f("isSet"));

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
index fea3774..1ec9d69 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
@@ -186,7 +186,7 @@ public abstract class DrillFuncHolder extends AbstractFuncHolder {
 
         ValueReference parameter = parameters[i];
         HoldingContainer inputVariable = inputVariables[i];
-        if (parameter.isFieldReader && ! inputVariable.isReader() && ! Types.isComplex(inputVariable.getMajorType())) {
+        if (parameter.isFieldReader && ! inputVariable.isReader() && ! Types.isComplex(inputVariable.getMajorType()) && inputVariable.getMinorType() != MinorType.UNION) {
           JType singularReaderClass = g.getModel()._ref(TypeHelper.getHolderReaderImpl(inputVariable.getMajorType().getMinorType(),
               inputVariable.getMajorType().getMode()));
           JType fieldReadClass = g.getModel()._ref(FieldReader.class);

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java
index e27234f..b7877df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java
@@ -46,7 +46,7 @@ public class MappifyUtility {
       throw new DrillRuntimeException("kvgen function only supports Simple maps as input");
     }
     BaseWriter.ListWriter listWriter = writer.rootAsList();
-    listWriter.start();
+    listWriter.startList();
     BaseWriter.MapWriter mapWriter = listWriter.map();
 
     // Iterate over the fields in the map
@@ -79,7 +79,7 @@ public class MappifyUtility {
 
       mapWriter.end();
     }
-    listWriter.end();
+    listWriter.endList();
 
     return buffer;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/UnionFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/UnionFunctions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/UnionFunctions.java
new file mode 100644
index 0000000..23a5004
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/UnionFunctions.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.fn.impl;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.holders.UnionHolder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.vector.complex.impl.UnionReader;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+
+import javax.inject.Inject;
+
+public class UnionFunctions {
+
+  @FunctionTemplate(names = {"typeString"},
+          scope = FunctionTemplate.FunctionScope.SIMPLE,
+          nulls = NullHandling.NULL_IF_NULL)
+  public static class FromType implements DrillSimpleFunc {
+
+    @Param
+    IntHolder in;
+    @Output
+    VarCharHolder out;
+    @Inject
+    DrillBuf buffer;
+
+    public void setup() {}
+
+    public void eval() {
+
+      VarCharHolder h = org.apache.drill.exec.vector.ValueHolderHelper.getVarCharHolder(buffer, org.apache.drill.common.types.MinorType.valueOf(in.value).toString());
+      out.buffer = h.buffer;
+      out.start = h.start;
+      out.end = h.end;
+    }
+  }
+
+  @FunctionTemplate(names = {"type"},
+          scope = FunctionTemplate.FunctionScope.SIMPLE,
+          nulls = NullHandling.NULL_IF_NULL)
+  public static class ToType implements DrillSimpleFunc {
+
+    @Param
+    VarCharHolder input;
+    @Output
+    IntHolder out;
+
+    public void setup() {}
+
+    public void eval() {
+
+      out.value = input.getType().getMinorType().getNumber();
+      byte[] b = new byte[input.end - input.start];
+      input.buffer.getBytes(input.start, b, 0, b.length);
+      String type = new String(b);
+      out.value = org.apache.drill.common.types.MinorType.valueOf(type.toUpperCase()).getNumber();
+    }
+  }
+
+  @FunctionTemplate(names = {"typeOf"},
+          scope = FunctionTemplate.FunctionScope.SIMPLE,
+          nulls = NullHandling.INTERNAL)
+  public static class GetType implements DrillSimpleFunc {
+
+    @Param
+    FieldReader input;
+    @Output
+    IntHolder out;
+
+    public void setup() {}
+
+    public void eval() {
+
+      out.value = input.isSet() ? input.getType().getMinorType().getNumber() : 0;
+
+    }
+  }
+
+  @SuppressWarnings("unused")
+  @FunctionTemplate(names = {"castUNION", "castToUnion"}, scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.NULL_IF_NULL)
+  public static class CastUnionToUnion implements DrillSimpleFunc{
+
+    @Param FieldReader in;
+    @Output
+    UnionHolder out;
+
+    public void setup() {}
+
+    public void eval() {
+      out.reader = in;
+      out.isSet = in.isSet() ? 1 : 0;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/UnionHolder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/UnionHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/UnionHolder.java
new file mode 100644
index 0000000..84cdefb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/UnionHolder.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.holders;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+
+public class UnionHolder implements ValueHolder {
+  public static final MajorType TYPE = Types.optional(MinorType.UNION);
+  public FieldReader reader;
+  public int isSet;
+
+  public MajorType getType() {
+    return reader.getType();
+  }
+
+  public boolean isSet() {
+    return isSet == 1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index 432e06b..fd16bc8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -176,7 +177,8 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
     final List<TransferPair> transfers = Lists.newArrayList();
     final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION2, context.getFunctionRegistry());
 
-    final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector, context.getFunctionRegistry());
+    final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector,
+            context.getFunctionRegistry(), false, unionTypeEnabled);
     if (collector.hasErrors()) {
       throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
     }


Mime
View raw message