drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject [3/6] drill git commit: DRILL-3229: Implement Union type vector
Date Tue, 03 Nov 2015 08:44:36 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 3a8b735..9f64626 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -288,7 +288,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
     final IntOpenHashSet transferFieldIds = new IntOpenHashSet();
 
     final NamedExpression flattenExpr = new NamedExpression(popConfig.getColumn(), new FieldReference(popConfig.getColumn()));
-    final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression)ExpressionTreeMaterializer.materialize(flattenExpr.getExpr(), incoming, collector, context.getFunctionRegistry(), true);
+    final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression)ExpressionTreeMaterializer.materialize(flattenExpr.getExpr(), incoming, collector, context.getFunctionRegistry(), true, false);
     final TransferPair tp = getFlattenFieldTransferPair(flattenExpr.getRef());
 
     if (tp != null) {
@@ -315,7 +315,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
         }
       }
 
-      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true);
+      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true, false);
       final MaterializedField outputField = MaterializedField.create(outputName, expr.getMajorType());
       if (collector.hasErrors()) {
         throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 5b5c90d..dfca892 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -37,6 +37,7 @@ import org.apache.drill.common.expression.fn.CastFunctions;
 import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -63,7 +64,6 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
 import com.carrotsearch.hppc.IntOpenHashSet;
@@ -381,7 +381,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         }
       }
 
-      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true);
+      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming,
+              collector, context.getFunctionRegistry(), true, unionTypeEnabled);
       final MaterializedField outputField = MaterializedField.create(outputName, expr.getMajorType());
       if (collector.hasErrors()) {
         throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index d8f703e..aaa6f9e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -30,6 +31,7 @@ import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.server.options.OptionValue;
 
 public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements CloseableRecordBatch {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass());
@@ -39,6 +41,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   protected final FragmentContext context;
   protected final OperatorContext oContext;
   protected final OperatorStats stats;
+  protected final boolean unionTypeEnabled;
 
   protected BatchState state;
 
@@ -62,6 +65,12 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     } else {
       state = BatchState.FIRST;
     }
+    OptionValue option = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE.getOptionName());
+    if (option != null) {
+      unionTypeEnabled = option.bool_val;
+    } else {
+      unionTypeEnabled = false;
+    }
   }
 
   protected static enum BatchState {

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
index 6b1d178..21b3adc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
@@ -19,11 +19,15 @@ package org.apache.drill.exec.record;
 
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.MinorType;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 import org.apache.drill.exec.vector.complex.AbstractMapVector;
 import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.impl.UnionVector;
 
 public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleVectorWrapper.class);
@@ -103,6 +107,13 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper
     }
     PathSegment seg = expectedPath.getRootSegment();
 
+    if (v instanceof UnionVector) {
+      TypedFieldId.Builder builder = TypedFieldId.newBuilder();
+      builder.addId(id).remainder(expectedPath.getRootSegment().getChild());
+      builder.finalType(Types.optional(TypeProtos.MinorType.UNION));
+      builder.intermediateType(Types.optional(TypeProtos.MinorType.UNION));
+      return builder.build();
+    } else
     if (v instanceof AbstractContainerVector) {
       // we're looking for a multi path.
       AbstractContainerVector c = (AbstractContainerVector) v;

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/ResolverTypePrecedence.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/ResolverTypePrecedence.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/ResolverTypePrecedence.java
index 29dd6a5..8c602b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/ResolverTypePrecedence.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/ResolverTypePrecedence.java
@@ -77,6 +77,7 @@ public class ResolverTypePrecedence {
     precedenceMap.put(MinorType.INTERVALDAY, i+= 2);
     precedenceMap.put(MinorType.INTERVALYEAR, i+= 2);
     precedenceMap.put(MinorType.INTERVAL, i+= 2);
+    precedenceMap.put(MinorType.UNION, i += 2);
 
     MAX_IMPLICIT_CAST_COST = i;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java
index 3278d3e..7ee8ebe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java
@@ -24,6 +24,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
@@ -750,6 +752,8 @@ public class TypeCastRules {
     rule.add(MinorType.VARBINARY);
     rule.add(MinorType.FIXEDBINARY);
     rules.put(MinorType.VARBINARY, rule);
+
+    rules.put(MinorType.UNION, Sets.newHashSet(MinorType.UNION));
   }
 
   public static boolean isCastableWithNullHandling(MajorType from, MajorType to, NullHandling nullHandling) {
@@ -792,10 +796,16 @@ public class TypeCastRules {
   public static MinorType getLeastRestrictiveType(List<MinorType> types) {
     assert types.size() >= 2;
     MinorType result = types.get(0);
+    if (result == MinorType.UNION) {
+      return result;
+    }
     int resultPrec = ResolverTypePrecedence.precedenceMap.get(result);
 
     for (int i = 1; i < types.size(); i++) {
       MinorType next = types.get(i);
+      if (next == MinorType.UNION) {
+        return next;
+      }
       if (next == result) {
         // both args are of the same type; continue
         continue;

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index b3bab2a..5e16483 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -92,6 +92,7 @@ public class SystemOptionManager extends BaseOptionManager {
       ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR,
       ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR,
       ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR,
+      ExecConstants.ENABLE_UNION_TYPE,
       ExecConstants.TEXT_ESTIMATED_ROW_SIZE,
       ExecConstants.JSON_EXTENDED_TYPES,
       ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR,

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java
index 1a94452..e74021b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java
@@ -49,7 +49,7 @@ public class MapOrListWriter {
     if (map != null) {
       map.start();
     } else {
-      list.start();
+      list.startList();
     }
   }
 
@@ -57,7 +57,7 @@ public class MapOrListWriter {
     if (map != null) {
       map.end();
     } else {
-      list.end();
+      list.endList();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 4d51199..0e3c908 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -59,6 +59,7 @@ public class JSONRecordReader extends AbstractRecordReader {
   private final FragmentContext fragmentContext;
   private final boolean enableAllTextMode;
   private final boolean readNumbersAsDouble;
+  private final boolean unionEnabled;
 
   /**
    * Create a JSON Record Reader that uses a file based input stream.
@@ -108,6 +109,7 @@ public class JSONRecordReader extends AbstractRecordReader {
     // only enable all text mode if we aren't using embedded content mode.
     this.enableAllTextMode = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR);
     this.readNumbersAsDouble = fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE).bool_val;
+    this.unionEnabled = fragmentContext.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
     setColumns(columns);
   }
 
@@ -118,7 +120,7 @@ public class JSONRecordReader extends AbstractRecordReader {
         this.stream = fileSystem.openPossiblyCompressedStream(hadoopPath);
       }
 
-      this.writer = new VectorContainerWriter(output);
+      this.writer = new VectorContainerWriter(output, unionEnabled);
       if (isSkipQuery()) {
         this.jsonReader = new CountingJsonReader(fragmentContext.getManagedBuffer());
       } else {

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
index ea45653..9f7d2b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.store.JSONOutputRecordWriter;
 import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.vector.complex.fn.BasicJsonOutput;
 import org.apache.drill.exec.vector.complex.fn.ExtendedJsonOutput;
+import org.apache.drill.exec.vector.complex.fn.JsonWriter;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -129,6 +130,29 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
   }
 
   @Override
+  public FieldConverter getNewUnionConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new UnionJsonConverter(fieldId, fieldName, reader);
+  }
+
+  public class UnionJsonConverter extends FieldConverter {
+
+    public UnionJsonConverter(int fieldId, String fieldName, FieldReader reader) {
+      super(fieldId, fieldName, reader);
+    }
+
+    @Override
+    public void startField() throws IOException {
+      gen.writeFieldName(fieldName);
+    }
+
+    @Override
+    public void writeField() throws IOException {
+      JsonWriter writer = new JsonWriter(gen);
+      writer.write(reader);
+    }
+  }
+
+  @Override
   public FieldConverter getNewRepeatedMapConverter(int fieldId, String fieldName, FieldReader reader) {
     return new RepeatedMapJsonConverter(fieldId, fieldName, reader);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/UnionSqlAccessor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/UnionSqlAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/UnionSqlAccessor.java
new file mode 100644
index 0000000..ecf2596
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/UnionSqlAccessor.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.vector.complex.impl.UnionVector;
+import org.apache.drill.exec.vector.complex.impl.UnionWriter;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+public class UnionSqlAccessor extends AbstractSqlAccessor {
+
+  FieldReader reader;
+
+  public UnionSqlAccessor(UnionVector vector) {
+    reader = vector.getReader();
+  }
+
+  @Override
+  public boolean isNull(int rowOffset) {
+    reader.setPosition(rowOffset);
+    return reader.isSet();
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(int rowOffset) throws InvalidAccessException{
+    reader.setPosition(rowOffset);
+    return reader.readBigDecimal();
+  }
+
+  @Override
+  public boolean getBoolean(int rowOffset) throws InvalidAccessException{
+    reader.setPosition(rowOffset);
+    return reader.readBoolean();
+  }
+
+  @Override
+  public byte getByte(int rowOffset) throws InvalidAccessException{
+    reader.setPosition(rowOffset);
+    return reader.readByte();
+  }
+
+  @Override
+  public byte[] getBytes(int rowOffset) throws InvalidAccessException{
+    reader.setPosition(rowOffset);
+    return reader.readByteArray();
+  }
+
+  @Override
+  public double getDouble(int rowOffset) throws InvalidAccessException{
+    reader.setPosition(rowOffset);
+    return reader.readDouble();
+  }
+
+  @Override
+  public float getFloat(int rowOffset) throws InvalidAccessException{
+    reader.setPosition(rowOffset);
+    return reader.readFloat();
+  }
+
+  @Override
+  public int getInt(int rowOffset) throws InvalidAccessException{
+    reader.setPosition(rowOffset);
+    return reader.readInteger();
+  }
+
+  @Override
+  public long getLong(int rowOffset) throws InvalidAccessException{
+    reader.setPosition(rowOffset);
+    return reader.readLong();
+  }
+
+  @Override
+  public short getShort(int rowOffset) throws InvalidAccessException{
+    reader.setPosition(rowOffset);
+    return reader.readShort();
+  }
+
+  @Override
+  public char getChar(int rowOffset) throws InvalidAccessException{
+    reader.setPosition(rowOffset);
+    return reader.readCharacter();
+  }
+
+  @Override
+  public String getString(int rowOffset) throws InvalidAccessException{
+    reader.setPosition(rowOffset);
+    return getObject(rowOffset).toString();
+  }
+
+  @Override
+  public Object getObject(int rowOffset) throws InvalidAccessException {
+    reader.setPosition(rowOffset);
+    return reader.readObject();
+  }
+
+  @Override
+  public MajorType getType() {
+    return Types.optional(MinorType.UNION);
+  }
+
+  @Override
+  public Class<?> getObjectClass() {
+    return Object.class;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
new file mode 100644
index 0000000..ccd6239
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
@@ -0,0 +1,292 @@
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.vector.complex;
+
+import com.google.common.collect.ObjectArrays;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.util.CallBack;
+import org.apache.drill.exec.util.JsonStringArrayList;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.UInt1Vector;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VectorDescriptor;
+import org.apache.drill.exec.vector.complex.impl.ComplexCopier;
+import org.apache.drill.exec.vector.complex.impl.UnionListReader;
+import org.apache.drill.exec.vector.complex.impl.UnionListWriter;
+import org.apache.drill.exec.vector.complex.impl.UnionVector;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.exec.vector.complex.writer.FieldWriter;
+
+import java.util.List;
+
+public class ListVector extends BaseRepeatedValueVector {
+
+  UInt4Vector offsets;
+  protected final UInt1Vector bits;
+  Mutator mutator = new Mutator();
+  Accessor accessor = new Accessor();
+  UnionListWriter writer;
+  UnionListReader reader;
+
+  public ListVector(MaterializedField field, BufferAllocator allocator, CallBack callBack) {
+    super(field, allocator, new UnionVector(field, allocator, callBack));
+    this.bits = new UInt1Vector(MaterializedField.create("$bits$", Types.required(MinorType.UINT1)), allocator);
+    offsets = getOffsetVector();
+    this.field.addChild(getDataVector().getField());
+    this.writer = new UnionListWriter(this);
+    this.reader = new UnionListReader(this);
+  }
+
+  public UnionListWriter getWriter() {
+    return writer;
+  }
+
+  @Override
+  public void allocateNew() throws OutOfMemoryRuntimeException {
+    super.allocateNewSafe();
+  }
+
+  public void transferTo(ListVector target) {
+    offsets.makeTransferPair(target.offsets).transfer();
+    bits.makeTransferPair(target.bits).transfer();
+    getDataVector().makeTransferPair(target.getDataVector()).transfer();
+  }
+
+  public void copyFrom(int inIndex, int outIndex, ListVector from) {
+    FieldReader in = from.getReader();
+    in.setPosition(inIndex);
+    FieldWriter out = getWriter();
+    out.setPosition(outIndex);
+    ComplexCopier copier = new ComplexCopier(in, out);
+    copier.write();
+  }
+
+  @Override
+  public UnionVector getDataVector() {
+    return (UnionVector) vector;
+  }
+
+  @Override
+  public TransferPair getTransferPair(FieldReference ref) {
+    return new TransferImpl(field.withPath(ref));
+  }
+
+  @Override
+  public TransferPair makeTransferPair(ValueVector target) {
+    return new TransferImpl((ListVector) target);
+  }
+
+  private class TransferImpl implements TransferPair {
+
+    ListVector to;
+
+    public TransferImpl(MaterializedField field) {
+      to = new ListVector(field, allocator, null);
+    }
+
+    public TransferImpl(ListVector to) {
+      this.to = to;
+    }
+
+    @Override
+    public void transfer() {
+      transferTo(to);
+    }
+
+    @Override
+    public void splitAndTransfer(int startIndex, int length) {
+
+    }
+
+    @Override
+    public ValueVector getTo() {
+      return to;
+    }
+
+    @Override
+    public void copyValueSafe(int from, int to) {
+      this.to.copyFrom(from, to, ListVector.this);
+    }
+  }
+
+  @Override
+  public Accessor getAccessor() {
+    return accessor;
+  }
+
+  @Override
+  public Mutator getMutator() {
+    return mutator;
+  }
+
+  @Override
+  public FieldReader getReader() {
+    return reader;
+  }
+
+  @Override
+  public boolean allocateNewSafe() {
+    /* boolean to keep track if all the memory allocation were successful
+     * Used in the case of composite vectors when we need to allocate multiple
+     * buffers for multiple vectors. If one of the allocations failed we need to
+     * clear all the memory that we allocated
+     */
+    boolean success = false;
+    try {
+      if (!offsets.allocateNewSafe()) {
+        return false;
+      }
+      success = vector.allocateNewSafe();
+      success = success && bits.allocateNewSafe();
+    } finally {
+      if (!success) {
+        clear();
+      }
+    }
+    offsets.zeroVector();
+    bits.zeroVector();
+    return success;
+  }
+
+  @Override
+  protected UserBitShared.SerializedField.Builder getMetadataBuilder() {
+    return getField().getAsBuilder()
+            .setValueCount(getAccessor().getValueCount())
+            .setBufferLength(getBufferSize())
+            .addChild(offsets.getMetadata())
+            .addChild(bits.getMetadata())
+            .addChild(vector.getMetadata());
+  }
+
+  @Override
+  public int getBufferSize() {
+    if (getAccessor().getValueCount() == 0) {
+      return 0;
+    }
+    return offsets.getBufferSize() + bits.getBufferSize() + vector.getBufferSize();
+  }
+
+  @Override
+  public void clear() {
+    offsets.clear();
+    vector.clear();
+    bits.clear();
+    lastSet = 0;
+    super.clear();
+  }
+
+  @Override
+  public DrillBuf[] getBuffers(boolean clear) {
+    final DrillBuf[] buffers = ObjectArrays.concat(offsets.getBuffers(false), ObjectArrays.concat(bits.getBuffers(false),
+            vector.getBuffers(false), DrillBuf.class), DrillBuf.class);
+    if (clear) {
+      for (DrillBuf buffer:buffers) {
+        buffer.retain();
+      }
+      clear();
+    }
+    return buffers;
+  }
+
+  @Override
+  public void load(UserBitShared.SerializedField metadata, DrillBuf buffer) {
+    final UserBitShared.SerializedField offsetMetadata = metadata.getChild(0);
+    offsets.load(offsetMetadata, buffer);
+
+    final int offsetLength = offsetMetadata.getBufferLength();
+    final UserBitShared.SerializedField bitMetadata = metadata.getChild(1);
+    final int bitLength = bitMetadata.getBufferLength();
+    bits.load(bitMetadata, buffer.slice(offsetLength, bitLength));
+
+    final UserBitShared.SerializedField vectorMetadata = metadata.getChild(2);
+    if (getDataVector() == DEFAULT_DATA_VECTOR) {
+      addOrGetVector(VectorDescriptor.create(vectorMetadata.getMajorType()));
+    }
+
+    final int vectorLength = vectorMetadata.getBufferLength();
+    vector.load(vectorMetadata, buffer.slice(offsetLength + bitLength, vectorLength));
+  }
+
+  private int lastSet;
+
+  public class Accessor extends BaseRepeatedAccessor {
+
+    @Override
+    public Object getObject(int index) {
+      if (bits.getAccessor().isNull(index)) {
+        return null;
+      }
+      final List<Object> vals = new JsonStringArrayList<>();
+      final UInt4Vector.Accessor offsetsAccessor = offsets.getAccessor();
+      final int start = offsetsAccessor.get(index);
+      final int end = offsetsAccessor.get(index + 1);
+      final UnionVector.Accessor valuesAccessor = getDataVector().getAccessor();
+      for(int i = start; i < end; i++) {
+        vals.add(valuesAccessor.getObject(i));
+      }
+      return vals;
+    }
+
+    @Override
+    public boolean isNull(int index) {
+      return bits.getAccessor().get(index) == 0;
+    }
+  }
+
+  public class Mutator extends BaseRepeatedMutator {
+    public void setNotNull(int index) {
+      bits.getMutator().setSafe(index, 1);
+      lastSet = index + 1;
+    }
+
+    @Override
+    public void startNewValue(int index) {
+      for (int i = lastSet; i <= index; i++) {
+        offsets.getMutator().setSafe(i + 1, offsets.getAccessor().get(i));
+      }
+      setNotNull(index);
+      lastSet = index + 1;
+    }
+
+    @Override
+    public void setValueCount(int valueCount) {
+      // TODO: populate offset end points
+      if (valueCount == 0) {
+        offsets.getMutator().setValueCount(0);
+      } else {
+        for (int i = lastSet; i < valueCount; i++) {
+          offsets.getMutator().setSafe(i + 1, offsets.getAccessor().get(i));
+        }
+        offsets.getMutator().setValueCount(valueCount + 1);
+      }
+      final int childValueCount = valueCount == 0 ? 0 : offsets.getAccessor().get(valueCount);
+      vector.getMutator().setValueCount(childValueCount);
+      bits.getMutator().setValueCount(valueCount);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 603776d..d55b1d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -275,83 +275,86 @@ public class JsonReader extends BaseJsonProcessor {
   private void writeData(MapWriter map, FieldSelection selection, boolean moveForward) throws IOException {
     //
     map.start();
-    outside: while (true) {
-
-      JsonToken t;
-      if(moveForward){
-        t = parser.nextToken();
-      }else{
-        t = parser.getCurrentToken();
-        moveForward = true;
-      }
+    try {
+      outside:
+      while (true) {
+
+        JsonToken t;
+        if (moveForward) {
+          t = parser.nextToken();
+        } else {
+          t = parser.getCurrentToken();
+          moveForward = true;
+        }
 
-      if (t == JsonToken.NOT_AVAILABLE || t == JsonToken.END_OBJECT) {
-        return;
-      }
+        if (t == JsonToken.NOT_AVAILABLE || t == JsonToken.END_OBJECT) {
+          return;
+        }
 
-      assert t == JsonToken.FIELD_NAME : String.format("Expected FIELD_NAME but got %s.", t.name());
+        assert t == JsonToken.FIELD_NAME : String.format("Expected FIELD_NAME but got %s.", t.name());
 
-      final String fieldName = parser.getText();
-      this.currentFieldName = fieldName;
-      FieldSelection childSelection = selection.getChild(fieldName);
-      if (childSelection.isNeverValid()) {
-        consumeEntireNextValue();
-        continue outside;
-      }
-
-      switch (parser.nextToken()) {
-      case START_ARRAY:
-        writeData(map.list(fieldName));
-        break;
-      case START_OBJECT:
-        if (!writeMapDataIfTyped(map, fieldName)) {
-          writeData(map.map(fieldName), childSelection, false);
+        final String fieldName = parser.getText();
+        this.currentFieldName = fieldName;
+        FieldSelection childSelection = selection.getChild(fieldName);
+        if (childSelection.isNeverValid()) {
+          consumeEntireNextValue();
+          continue outside;
         }
-        break;
-      case END_OBJECT:
-        break outside;
 
-      case VALUE_FALSE: {
-        map.bit(fieldName).writeBit(0);
-        atLeastOneWrite = true;
-        break;
-      }
-      case VALUE_TRUE: {
-        map.bit(fieldName).writeBit(1);
-        atLeastOneWrite = true;
-        break;
-      }
-      case VALUE_NULL:
-        // do nothing as we don't have a type.
-        break;
-      case VALUE_NUMBER_FLOAT:
-        map.float8(fieldName).writeFloat8(parser.getDoubleValue());
-        atLeastOneWrite = true;
-        break;
-      case VALUE_NUMBER_INT:
-        if (this.readNumbersAsDouble) {
-          map.float8(fieldName).writeFloat8(parser.getDoubleValue());
+        switch (parser.nextToken()) {
+        case START_ARRAY:
+          writeData(map.list(fieldName));
+          break;
+        case START_OBJECT:
+          if (!writeMapDataIfTyped(map, fieldName)) {
+            writeData(map.map(fieldName), childSelection, false);
+          }
+          break;
+        case END_OBJECT:
+          break outside;
+
+        case VALUE_FALSE: {
+          map.bit(fieldName).writeBit(0);
+          atLeastOneWrite = true;
+          break;
         }
-        else {
-          map.bigInt(fieldName).writeBigInt(parser.getLongValue());
+        case VALUE_TRUE: {
+          map.bit(fieldName).writeBit(1);
+          atLeastOneWrite = true;
+          break;
+        }
+        case VALUE_NULL:
+          // do nothing as we don't have a type.
+          break;
+        case VALUE_NUMBER_FLOAT:
+          map.float8(fieldName).writeFloat8(parser.getDoubleValue());
+          atLeastOneWrite = true;
+          break;
+        case VALUE_NUMBER_INT:
+          if (this.readNumbersAsDouble) {
+            map.float8(fieldName).writeFloat8(parser.getDoubleValue());
+          } else {
+            map.bigInt(fieldName).writeBigInt(parser.getLongValue());
+          }
+          atLeastOneWrite = true;
+          break;
+        case VALUE_STRING:
+          handleString(parser, map, fieldName);
+          atLeastOneWrite = true;
+          break;
+
+        default:
+          throw
+                  getExceptionWithContext(
+                          UserException.dataReadError(), currentFieldName, null)
+                          .message("Unexpected token %s", parser.getCurrentToken())
+                          .build(logger);
         }
-        atLeastOneWrite = true;
-        break;
-      case VALUE_STRING:
-        handleString(parser, map, fieldName);
-        atLeastOneWrite = true;
-        break;
 
-      default:
-        throw
-          getExceptionWithContext(
-            UserException.dataReadError(), currentFieldName, null)
-          .message("Unexpected token %s", parser.getCurrentToken())
-          .build(logger);
       }
-
+    } finally {
+      map.end();
     }
-    map.end();
 
   }
 
@@ -463,8 +466,8 @@ public class JsonReader extends BaseJsonProcessor {
     writer.varChar().writeVarChar(0, workingBuffer.prepareVarCharHolder(parser.getText()), workingBuffer.getBuf());
   }
 
-  private void writeData(ListWriter list) {
-    list.start();
+  private void writeData(ListWriter list) throws IOException {
+    list.startList();
     outside: while (true) {
       try {
       switch (parser.nextToken()) {
@@ -523,12 +526,12 @@ public class JsonReader extends BaseJsonProcessor {
       throw getExceptionWithContext(e, this.currentFieldName, null).build(logger);
     }
     }
-    list.end();
+    list.endList();
 
   }
 
   private void writeDataAllText(ListWriter list) throws IOException {
-    list.start();
+    list.startList();
     outside: while (true) {
 
       switch (parser.nextToken()) {
@@ -562,7 +565,7 @@ public class JsonReader extends BaseJsonProcessor {
           .build(logger);
       }
     }
-    list.end();
+    list.endList();
 
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java
index 8309bf3..6ff7d90 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java
@@ -47,6 +47,10 @@ public class JsonWriter {
 
   }
 
+  public JsonWriter(JsonOutput gen) {
+    this.gen = gen;
+  }
+
   public void write(FieldReader reader) throws JsonGenerationException, IOException{
     writeValue(reader);
     gen.flush();
@@ -109,7 +113,11 @@ public class JsonWriter {
 
       case LIST:
         // this is a pseudo class, doesn't actually contain the real reader so we have to drop down.
-        writeValue(reader.reader());
+        gen.writeStartArray();
+        while (reader.next()) {
+          writeValue(reader.reader());
+        }
+        gen.writeEndArray();
         break;
       case MAP:
         gen.writeStartObject();
@@ -125,6 +133,7 @@ public class JsonWriter {
         gen.writeEndObject();
         break;
       case NULL:
+      case LATE:
         gen.writeUntypedNull();
         break;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java
index 22addc9..fea326c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
 
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.holders.UnionHolder;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
@@ -71,4 +72,20 @@ abstract class AbstractBaseReader implements FieldReader{
   public int size() {
     throw new IllegalStateException("The current reader doesn't support getting size information.");
   }
+
+  @Override
+  public void read(UnionHolder holder) {
+    holder.reader = this;
+    holder.isSet = this.isSet() ? 1 : 0;
+  }
+
+  @Override
+  public void read(int index, UnionHolder holder) {
+    throw new IllegalStateException("The current reader doesn't support reading union type");
+  }
+
+  @Override
+  public void copyAsValue(UnionWriter writer) {
+    throw new IllegalStateException("The current reader doesn't support reading union type");
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
index 88a56f8..23bcfcb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
@@ -38,13 +38,19 @@ public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWri
 
   Mode mode = Mode.INIT;
   private final String name;
+  private final boolean unionEnabled;
 
   private enum Mode { INIT, MAP, LIST };
 
-  public ComplexWriterImpl(String name, MapVector container){
+  public ComplexWriterImpl(String name, MapVector container, boolean unionEnabled){
     super(null);
     this.name = name;
     this.container = container;
+    this.unionEnabled = unionEnabled;
+  }
+
+  public ComplexWriterImpl(String name, MapVector container){
+    this(name, container, false);
   }
 
   @Override
@@ -120,7 +126,7 @@ public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWri
 
     case INIT:
       MapVector map = (MapVector) container;
-      mapRoot = new SingleMapWriter(map, this);
+      mapRoot = new SingleMapWriter(map, this, unionEnabled, false);
       mapRoot.setPosition(idx());
       mode = Mode.MAP;
       break;
@@ -141,7 +147,7 @@ public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWri
 
     case INIT:
       MapVector map = container.addOrGet(name, Types.required(MinorType.MAP), MapVector.class);
-      mapRoot = new SingleMapWriter(map, this);
+      mapRoot = new SingleMapWriter(map, this, unionEnabled, false);
       mapRoot.setPosition(idx());
       mode = Mode.MAP;
       break;

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/UnionListReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/UnionListReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/UnionListReader.java
new file mode 100644
index 0000000..fef3325
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/UnionListReader.java
@@ -0,0 +1,90 @@
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.vector.complex.impl;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.expr.holders.UnionHolder;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.complex.ListVector;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+
+public class UnionListReader extends AbstractFieldReader {
+
+  private ListVector vector;
+  private UnionVector data;
+  private UInt4Vector offsets;
+  private UnionReader reader;
+
+  public UnionListReader(ListVector vector) {
+    this.vector = vector;
+    this.data = vector.getDataVector();
+    this.offsets = vector.getOffsetVector();
+    this.reader = (UnionReader) data.getReader();
+  }
+
+  @Override
+  public boolean isSet() {
+    return true;
+  }
+
+  @Override
+  public MajorType getType() {
+    return reader.getType();
+  }
+
+  private int currentOffset;
+  private int maxOffset;
+
+  @Override
+  public void setPosition(int index) {
+    super.setPosition(index);
+    currentOffset = offsets.getAccessor().get(index) - 1;
+    maxOffset = offsets.getAccessor().get(index + 1);
+  }
+
+  @Override
+  public FieldReader reader() {
+    return reader;
+  }
+
+  @Override
+  public Object readObject() {
+    return vector.getAccessor().getObject(idx());
+  }
+
+  @Override
+  public void read(int index, UnionHolder holder) {
+    setPosition(idx());
+    for (int i = -1; i < index; i++) {
+      next();
+    }
+    holder.reader = reader;
+    holder.isSet = reader.isSet() ? 1 : 0;
+  }
+
+  @Override
+  public boolean next() {
+    if (currentOffset + 1 < maxOffset) {
+      reader.setPosition(++currentOffset);
+      return true;
+    } else {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
index 95c651f..6bc2e05 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
@@ -33,11 +33,15 @@ public class VectorContainerWriter extends AbstractFieldWriter implements Comple
   private final SpecialMapVector mapVector;
   private final OutputMutator mutator;
 
-  public VectorContainerWriter(OutputMutator mutator) {
+  public VectorContainerWriter(OutputMutator mutator, boolean unionEnabled) {
     super(null);
     this.mutator = mutator;
     mapVector = new SpecialMapVector(mutator.getCallBack());
-    mapRoot = new SingleMapWriter(mapVector, this);
+    mapRoot = new SingleMapWriter(mapVector, this, unionEnabled, false);
+  }
+
+  public VectorContainerWriter(OutputMutator mutator) {
+    this(mutator, false);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
index e3591b6..51b909b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
@@ -98,4 +98,8 @@ public class TestOutputMutator implements OutputMutator, Iterable<VectorWrapper<
     return null;
   }
 
+  public VectorContainer getContainer() {
+    return container;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
index bd9cea1..0ae8945 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
@@ -104,6 +104,7 @@ public class TestJsonReader extends BaseTestQuery {
         .baselineValues(listOf(testVal))
         .go();
 
+    test("select flatten(config) as flat from cp.`/store/json/null_list_v2.json`");
     testBuilder()
         .sqlQuery("select flatten(config) as flat from cp.`/store/json/null_list_v2.json`")
         .ordered()
@@ -377,5 +378,108 @@ public class TestJsonReader extends BaseTestQuery {
     assertEquals("[4,5,6]", vw.getValueVector().getAccessor().getObject(2).toString());
   }
 
+  @Test
+  public void testSelectStarWithUnionType() throws Exception {
+    try {
+      String query = "select * from cp.`jsoninput/union/a.json`";
+      testBuilder()
+              .sqlQuery(query)
+              .ordered()
+              .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+              .baselineColumns("field1", "field2")
+              .baselineValues(
+                      1L, 1.2
+              )
+              .baselineValues(
+                      listOf(2L), 1.2
+              )
+              .baselineValues(
+                      mapOf("inner1", 3L, "inner2", 4L), listOf(3L, 4.0, "5")
+              )
+              .baselineValues(
+                      mapOf("inner1", 3L,
+                              "inner2", listOf(
+                                      mapOf(
+                                              "innerInner1", 1L,
+                                              "innerInner2",
+                                              listOf(
+                                                      3L,
+                                                      "a"
+                                              )
+                                      )
+                              )
+                      ),
+                      listOf(
+                              mapOf("inner3", 7L),
+                              4.0,
+                              "5",
+                              mapOf("inner4", 9L),
+                              listOf(
+                                      mapOf(
+                                              "inner5", 10L,
+                                              "inner6", 11L
+                                      ),
+                                      mapOf(
+                                              "inner5", 12L,
+                                              "inner7", 13L
+                                      )
+                              )
+                      )
+              ).go();
+    } finally {
+      testNoResult("alter session set `exec.enable_union_type` = false");
+    }
+  }
+
+  @Test
+  public void testSelectFromListWithCase() throws Exception {
+    String query = "select a from (select case when typeOf(field2) = type('list') then asBigInt(field2[4][1].inner7) end a from cp.`jsoninput/union/a.json`) where a is not null";
+    try {
+      testBuilder()
+              .sqlQuery(query)
+              .ordered()
+              .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+              .baselineColumns("a")
+              .baselineValues(13L)
+              .go();
+    } finally {
+      testNoResult("alter session set `exec.enable_union_type` = false");
+    }
+  }
+
+  @Test
+  public void testTypeCase() throws Exception {
+    String query = "select case typeOf(field1) when type('bigint') then asBigInt(field1) when type('list') then asBigInt(field1[0]) when type('map') then asBigInt(t.field1.inner1) end f1 from cp.`jsoninput/union/a.json` t";
+    try {
+      testBuilder()
+              .sqlQuery(query)
+              .ordered()
+              .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+              .baselineColumns("f1")
+              .baselineValues(1L)
+              .baselineValues(2L)
+              .baselineValues(3L)
+              .baselineValues(3L)
+              .go();
+    } finally {
+      testNoResult("alter session set `exec.enable_union_type` = false");
+    }
+  }
+
+  @Test
+  public void testSumWithTypeCase() throws Exception {
+    String query = "select sum(f1) sum_f1 from (select case typeOf(field1) when type('bigint') then asBigInt(field1) when type('list') then asBigInt(field1[0]) when type('map') then asBigInt(t.field1.inner1) end f1 from cp.`jsoninput/union/a.json` t)";
+    try {
+      testBuilder()
+              .sqlQuery(query)
+              .ordered()
+              .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
+              .baselineColumns("sum_f1")
+              .baselineValues(9L)
+              .go();
+    } finally {
+      testNoResult("alter session set `exec.enable_union_type` = false");
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java
index df0ce14..bd4731a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java
@@ -139,12 +139,12 @@ public class TestRepeated {
     {
       final MapWriter map = writer.rootAsMap();
       final ListWriter list = map.list("a");
-      list.start();
+      list.startList();
 
       final ListWriter innerList = list.list();
       final IntWriter innerInt = innerList.integer();
 
-      innerList.start();
+      innerList.startList();
 
       final IntHolder holder = new IntHolder();
 
@@ -155,16 +155,16 @@ public class TestRepeated {
       holder.value = 3;
       innerInt.write(holder);
 
-      innerList.end();
-      innerList.start();
+      innerList.endList();
+      innerList.startList();
 
       holder.value = 4;
       innerInt.write(holder);
       holder.value = 5;
       innerInt.write(holder);
 
-      innerList.end();
-      list.end();
+      innerList.endList();
+      list.endList();
 
       final IntWriter numCol = map.integer("nums");
       holder.value = 14;
@@ -192,12 +192,12 @@ public class TestRepeated {
 
       final MapWriter map = writer.rootAsMap();
       final ListWriter list = map.list("a");
-      list.start();
+      list.startList();
 
       final ListWriter innerList = list.list();
       final IntWriter innerInt = innerList.integer();
 
-      innerList.start();
+      innerList.startList();
 
       final IntHolder holder = new IntHolder();
 
@@ -208,16 +208,16 @@ public class TestRepeated {
       holder.value = -3;
       innerInt.write(holder);
 
-      innerList.end();
-      innerList.start();
+      innerList.endList();
+      innerList.startList();
 
       holder.value = -4;
       innerInt.write(holder);
       holder.value = -5;
       innerInt.write(holder);
 
-      innerList.end();
-      list.end();
+      innerList.endList();
+      list.endList();
 
       final IntWriter numCol = map.integer("nums");
       holder.value = -28;

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/java-exec/src/test/resources/jsoninput/union/a.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/jsoninput/union/a.json b/exec/java-exec/src/test/resources/jsoninput/union/a.json
new file mode 100644
index 0000000..438dba3
--- /dev/null
+++ b/exec/java-exec/src/test/resources/jsoninput/union/a.json
@@ -0,0 +1,52 @@
+{
+  field1: 1,
+  field2: 1.2
+}
+{
+  field1: [
+    2
+  ],
+  field2: 1.2
+}
+{
+  field1: {
+    inner1: 3,
+    inner2: 4
+  },
+  field2: [
+    3,
+    4.0,
+    "5"
+  ]
+}
+{
+  field1: {
+    inner1: 3,
+    inner2: [
+      {
+          innerInner1: 1,
+          innerInner2: [
+            3,
+            "a"
+          ]
+      }
+    ]
+  },
+  field2: [
+    {
+      inner3: 7
+    },
+    4.0,
+    "5",
+    {
+      inner4: 9
+    },
+    [{
+      inner5: 10,
+      inner6: 11
+    },{
+      inner5: 12,
+      inner7: 13
+    }]
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java
index 35890ec..b5634bf 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java
@@ -70,7 +70,6 @@ public class TestJdbcDistQuery extends JdbcTestBase {
         + "from dfs_test.`%s/../../sample-data/regionsSF/`", WORKING_PATH));
   }
 
-
   @Test
   public void testSimpleQueryMultiFile() throws Exception{
     testQuery(String.format("select R_REGIONKEY, R_NAME "

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/protocol/src/main/java/org/apache/drill/common/types/MinorType.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/common/types/MinorType.java b/protocol/src/main/java/org/apache/drill/common/types/MinorType.java
index 423ed53..16bd53e 100644
--- a/protocol/src/main/java/org/apache/drill/common/types/MinorType.java
+++ b/protocol/src/main/java/org/apache/drill/common/types/MinorType.java
@@ -58,7 +58,8 @@ public enum MinorType implements com.dyuproject.protostuff.EnumLite<MinorType>
     INTERVALYEAR(38),
     INTERVALDAY(39),
     LIST(40),
-    GENERIC_OBJECT(41);
+    GENERIC_OBJECT(41),
+    UNION(42);
     
     public final int number;
     
@@ -113,6 +114,7 @@ public enum MinorType implements com.dyuproject.protostuff.EnumLite<MinorType>
             case 39: return INTERVALDAY;
             case 40: return LIST;
             case 41: return GENERIC_OBJECT;
+            case 42: return UNION;
             default: return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/protocol/src/main/java/org/apache/drill/common/types/TypeProtos.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/common/types/TypeProtos.java b/protocol/src/main/java/org/apache/drill/common/types/TypeProtos.java
index 74ac444..be7fe74 100644
--- a/protocol/src/main/java/org/apache/drill/common/types/TypeProtos.java
+++ b/protocol/src/main/java/org/apache/drill/common/types/TypeProtos.java
@@ -317,6 +317,10 @@ public final class TypeProtos {
      * <code>GENERIC_OBJECT = 41;</code>
      */
     GENERIC_OBJECT(36, 41),
+    /**
+     * <code>UNION = 42;</code>
+     */
+    UNION(37, 42),
     ;
 
     /**
@@ -606,6 +610,10 @@ public final class TypeProtos {
      * <code>GENERIC_OBJECT = 41;</code>
      */
     public static final int GENERIC_OBJECT_VALUE = 41;
+    /**
+     * <code>UNION = 42;</code>
+     */
+    public static final int UNION_VALUE = 42;
 
 
     public final int getNumber() { return value; }
@@ -649,6 +657,7 @@ public final class TypeProtos {
         case 39: return INTERVALDAY;
         case 40: return LIST;
         case 41: return GENERIC_OBJECT;
+        case 42: return UNION;
         default: return null;
       }
     }
@@ -1780,7 +1789,7 @@ public final class TypeProtos {
       "inor_type\030\001 \001(\0162\021.common.MinorType\022\036\n\004mo" +
       "de\030\002 \001(\0162\020.common.DataMode\022\r\n\005width\030\003 \001(" +
       "\005\022\021\n\tprecision\030\004 \001(\005\022\r\n\005scale\030\005 \001(\005\022\020\n\010t" +
-      "imeZone\030\006 \001(\005*\212\004\n\tMinorType\022\010\n\004LATE\020\000\022\007\n" +
+      "imeZone\030\006 \001(\005*\225\004\n\tMinorType\022\010\n\004LATE\020\000\022\007\n" +
       "\003MAP\020\001\022\013\n\007TINYINT\020\003\022\014\n\010SMALLINT\020\004\022\007\n\003INT" +
       "\020\005\022\n\n\006BIGINT\020\006\022\014\n\010DECIMAL9\020\007\022\r\n\tDECIMAL1" +
       "8\020\010\022\023\n\017DECIMAL28SPARSE\020\t\022\023\n\017DECIMAL38SPA" +
@@ -1793,9 +1802,10 @@ public final class TypeProtos {
       "\036\022\t\n\005UINT4\020\037\022\t\n\005UINT8\020 \022\022\n\016DECIMAL28DENS" +
       "E\020!\022\022\n\016DECIMAL38DENSE\020\"\022\010\n\004NULL\020%\022\020\n\014INT" +
       "ERVALYEAR\020&\022\017\n\013INTERVALDAY\020\'\022\010\n\004LIST\020(\022\022" +
-      "\n\016GENERIC_OBJECT\020)*4\n\010DataMode\022\014\n\010OPTION" +
-      "AL\020\000\022\014\n\010REQUIRED\020\001\022\014\n\010REPEATED\020\002B-\n\035org." +
-      "apache.drill.common.typesB\nTypeProtosH\001"
+      "\n\016GENERIC_OBJECT\020)\022\t\n\005UNION\020**4\n\010DataMod" +
+      "e\022\014\n\010OPTIONAL\020\000\022\014\n\010REQUIRED\020\001\022\014\n\010REPEATE" +
+      "D\020\002B-\n\035org.apache.drill.common.typesB\nTy",
+      "peProtosH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/drill/blob/eb6325dc/protocol/src/main/protobuf/Types.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/Types.proto b/protocol/src/main/protobuf/Types.proto
index d93bcb5..36899f6 100644
--- a/protocol/src/main/protobuf/Types.proto
+++ b/protocol/src/main/protobuf/Types.proto
@@ -64,6 +64,7 @@ enum MinorType {
     INTERVALDAY = 39; // Interval type specifying DAY to SECONDS
     LIST = 40;
     GENERIC_OBJECT = 41;
+    UNION = 42;
 }
 
 message MajorType {


Mime
View raw message