drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [01/16] incubator-drill git commit: DRILL-1324: Add mechanism to detect schema changes when adding a new primitive vector in a Map, RepeatedMap, RepeatedList vector
Date Sat, 08 Nov 2014 00:03:02 GMT
Repository: incubator-drill
Updated Branches:
  refs/heads/master c4e1c58f3 -> a8fd9758b


DRILL-1324: Add mechanism to detect schema changes when adding a new primitive vector in a
Map, RepeatedMap, RepeatedList vector


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/61053a8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/61053a8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/61053a8c

Branch: refs/heads/master
Commit: 61053a8c9c22e7cce45bb74e3e1894f2cfd42041
Parents: c4e1c58
Author: Mehant Baid <mehantr@gmail.com>
Authored: Wed Aug 13 18:50:07 2014 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Fri Nov 7 08:59:05 2014 -0800

----------------------------------------------------------------------
 .../src/main/codegen/templates/TypeHelper.java  | 10 ++++--
 .../drill/exec/physical/impl/ScanBatch.java     |  8 +++--
 .../impl/project/ProjectRecordBatch.java        |  4 +--
 .../exec/record/AbstractSingleRecordBatch.java  | 10 +++++-
 .../drill/exec/record/VectorContainer.java      |  8 ++++-
 .../org/apache/drill/exec/util/CallBack.java    | 23 +++++++++++++
 .../drill/exec/vector/SchemaChangeCallBack.java | 36 ++++++++++++++++++++
 .../drill/exec/vector/complex/MapVector.java    | 14 +++++---
 .../exec/vector/complex/RepeatedListVector.java | 22 +++++++-----
 .../exec/vector/complex/RepeatedMapVector.java  | 14 +++++---
 .../vector/complex/impl/ComplexWriterImpl.java  |  2 +-
 .../complex/impl/VectorContainerWriter.java     |  2 +-
 .../vector/complex/writer/TestJsonReader.java   |  2 +-
 .../vector/complex/writer/TestRepeated.java     |  2 +-
 14 files changed, 128 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/61053a8c/exec/java-exec/src/main/codegen/templates/TypeHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/TypeHelper.java b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
index cb6a030..c83c301 100644
--- a/exec/java-exec/src/main/codegen/templates/TypeHelper.java
+++ b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
@@ -31,6 +31,7 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.vector.accessor.*;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.util.CallBack;
 
 public class TypeHelper {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TypeHelper.class);
@@ -258,6 +259,9 @@ public class TypeHelper {
   }
 
   public static ValueVector getNewVector(MaterializedField field, BufferAllocator allocator){
+    return getNewVector(field, allocator, null);
+  }
+  public static ValueVector getNewVector(MaterializedField field, BufferAllocator allocator,
CallBack callBack){
     MajorType type = field.getType();
 
     switch (type.getMinorType()) {
@@ -266,14 +270,14 @@ public class TypeHelper {
     case MAP:
       switch (type.getMode()) {
       case REQUIRED:
-        return new MapVector(field, allocator);
+        return new MapVector(field, allocator, callBack);
       case REPEATED:
-        return new RepeatedMapVector(field, allocator);
+        return new RepeatedMapVector(field, allocator, callBack);
       }
     case LIST:
       switch (type.getMode()) {
       case REPEATED:
-        return new RepeatedListVector(field, allocator);
+        return new RepeatedListVector(field, allocator, callBack);
       }    
 <#list vv.  types as type>
   <#list type.minor as minor>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/61053a8c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index ac65e40..4ed1180 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -47,8 +47,10 @@ import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.util.CallBack;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.collect.Lists;
@@ -80,6 +82,7 @@ public class ScanBatch implements RecordBatch {
   private String partitionColumnDesignator;
   private boolean first = false;
   private boolean done = false;
+  private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
 
   public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader>
readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns)
throws ExecutionSetupException {
     this.context = context;
@@ -313,7 +316,7 @@ public class ScanBatch implements RecordBatch {
 
       if (v == null || v.getClass() != clazz) {
         // Field does not exist add it to the map and the output container
-        v = TypeHelper.getNewVector(field, oContext.getAllocator());
+        v = TypeHelper.getNewVector(field, oContext.getAllocator(), callBack);
         if (!clazz.isAssignableFrom(v.getClass())) {
           throw new SchemaChangeException(String.format("The class that was provided %s does
not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
         }
@@ -345,7 +348,8 @@ public class ScanBatch implements RecordBatch {
 
     @Override
     public boolean isNewSchema() {
-      if (schemaChange == true) {
+      // Check if top level schema has changed, second condition checks if one of the deeper
map schema has changed
+      if (schemaChange == true || callBack.getSchemaChange()) {
         schemaChange = false;
         return true;
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/61053a8c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 486fb12..27cb1f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -343,7 +343,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>
{
               }
 
               MaterializedField outputField = MaterializedField.create(name, expr.getMajorType());
-              ValueVector vv = container.addOrGet(outputField);
+              ValueVector vv = container.addOrGet(outputField, callBack);
               allocationVectors.add(vv);
               TypedFieldId fid = container.getValueVectorId(outputField.getPath());
               ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr,
true);
@@ -407,7 +407,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>
{
         }
       } else{
         // need to do evaluation.
-        ValueVector vector = container.addOrGet(outputField);
+        ValueVector vector = container.addOrGet(outputField, callBack);
         allocationVectors.add(vector);
         TypedFieldId fid = container.getValueVectorId(outputField.getPath());
         ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/61053a8c/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index f6ae14f..1ef0345 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -21,7 +21,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
 
 public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> extends
AbstractRecordBatch<T> {
   final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
@@ -30,6 +30,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator>
exte
   private boolean first = true;
   protected boolean done = false;
   protected boolean outOfMemory = false;
+  protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
 
   public AbstractSingleRecordBatch(T popConfig, FragmentContext context, RecordBatch incoming)
throws OutOfMemoryException {
     super(popConfig, context);
@@ -86,10 +87,17 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator>
exte
       assert !first : "First batch should be OK_NEW_SCHEMA";
       container.zeroVectors();
       doWork();
+
       if (outOfMemory) {
         outOfMemory = false;
         return IterOutcome.OUT_OF_MEMORY;
       }
+
+      // Check if schema has changed
+      if (callBack.getSchemaChange()) {
+        return IterOutcome.OK_NEW_SCHEMA;
+      }
+
       return upstream; // change if upstream changed, otherwise normal.
     default:
       throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/61053a8c/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 8e269b8..fde727f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField.Key;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractMapVector;
 
@@ -80,14 +81,19 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto
     add(vv, releasable);
   }
 
+
   public <T extends ValueVector> T addOrGet(MaterializedField field) {
+    return addOrGet(field, null);
+  }
+
+  public <T extends ValueVector> T addOrGet(MaterializedField field, SchemaChangeCallBack
callBack) {
     TypedFieldId id = getValueVectorId(field.getPath());
     ValueVector v = null;
     Class clazz = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getType().getMode());
     if (id != null) {
       v = getValueAccessorById(id.getFieldIds()).getValueVector();
       if (id.getFieldIds().length == 1 && clazz != null && !clazz.isAssignableFrom(v.getClass()))
{
-        ValueVector newVector = TypeHelper.getNewVector(field, this.oContext.getAllocator());
+        ValueVector newVector = TypeHelper.getNewVector(field, this.oContext.getAllocator(),
callBack);
         replace(v, newVector);
         return (T) newVector;
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/61053a8c/exec/java-exec/src/main/java/org/apache/drill/exec/util/CallBack.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/CallBack.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/CallBack.java
new file mode 100644
index 0000000..0243f8a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/CallBack.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+
+public interface CallBack {
+  public void doWork();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/61053a8c/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java
new file mode 100644
index 0000000..386ee34
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.vector;
+
+import org.apache.drill.exec.util.CallBack;
+
+public class SchemaChangeCallBack implements CallBack {
+  private boolean schemaChange = false;
+
+  public void doWork() {
+    schemaChange = true;
+  }
+
+  public boolean getSchemaChange() {
+    boolean schemaChange = this.schemaChange;
+    this.schemaChange = false;
+    return schemaChange;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/61053a8c/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
index 1e4c8c4..7a0afdb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.util.CallBack;
 import org.apache.drill.exec.util.JsonStringHashMap;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector.MapSingleCopier;
@@ -65,16 +66,18 @@ public class MapVector extends AbstractContainerVector {
   private final BufferAllocator allocator;
   private MaterializedField field;
   private int valueCount;
+  private CallBack callBack;
 
-  public MapVector(String path, BufferAllocator allocator) {
+  public MapVector(String path, BufferAllocator allocator, CallBack callBack){
     this.field = MaterializedField.create(SchemaPath.getSimplePath(path), TYPE);
     this.allocator = allocator;
+    this.callBack = callBack;
   }
-  public MapVector(MaterializedField field, BufferAllocator allocator) {
+  public MapVector(MaterializedField field, BufferAllocator allocator, CallBack callBack){
     this.field = field;
     this.allocator = allocator;
+    this.callBack = callBack;
   }
-
   @Override
   public int size() {
     return vectors.size();
@@ -120,6 +123,9 @@ public class MapVector extends AbstractContainerVector {
       v = TypeHelper.getNewVector(field.getPath(), name, allocator, type);
       Preconditions.checkNotNull(v, String.format("Failure to create vector of type %s.",
type));
       put(name, v);
+      if (callBack != null) {
+        callBack.doWork();
+      }
     }
     return typeify(v, clazz);
 
@@ -222,7 +228,7 @@ public class MapVector extends AbstractContainerVector {
     private MapVector to;
 
     public MapTransferPair(SchemaPath path) {
-      MapVector v = new MapVector(MaterializedField.create(path, TYPE), allocator);
+      MapVector v = new MapVector(MaterializedField.create(path, TYPE), allocator, callBack);
       pairs = new TransferPair[vectors.size()];
       int i =0;
       for (Map.Entry<String, ValueVector> e : vectors.entrySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/61053a8c/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index c75b359..362d806 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.util.JsonStringArrayList;
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.drill.exec.util.CallBack;
 import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.impl.NullReader;
@@ -61,6 +62,7 @@ public class RepeatedListVector extends AbstractContainerVector implements
Repea
   private final RepeatedListReaderImpl reader = new RepeatedListReaderImpl(null, this);
   private int allocationValueCount = 4000;
   private int allocationMonitor = 0;
+  private CallBack callBack;
 
   private int lastSet = 0;
 
@@ -68,10 +70,15 @@ public class RepeatedListVector extends AbstractContainerVector implements
Repea
 
   public static MajorType TYPE = Types.repeated(MinorType.LIST);
 
-  public RepeatedListVector(MaterializedField field, BufferAllocator allocator) {
+  public RepeatedListVector(MaterializedField field, BufferAllocator allocator, CallBack
callBack){
     this.allocator = allocator;
     this.offsets = new UInt4Vector(null, allocator);
     this.field = field;
+    this.callBack = callBack;
+  }
+
+  public RepeatedListVector(SchemaPath path, BufferAllocator allocator, CallBack callBack){
+    this(MaterializedField.create(path, TYPE), allocator, callBack);
   }
 
   @Override
@@ -93,10 +100,6 @@ public class RepeatedListVector extends AbstractContainerVector implements
Repea
     return primitiveVectors;
   }
 
-  public RepeatedListVector(SchemaPath path, BufferAllocator allocator) {
-    this(MaterializedField.create(path, TYPE), allocator);
-  }
-
   transient private RepeatedListTransferPair ephPair;
 
   public boolean copyFromSafe(int fromIndex, int thisIndex, RepeatedListVector from) {
@@ -313,7 +316,7 @@ public class RepeatedListVector extends AbstractContainerVector implements
Repea
     }
 
     private RepeatedListTransferPair(SchemaPath path) {
-      this.to = new RepeatedListVector(path, allocator);
+      this.to = new RepeatedListVector(path, allocator, callBack);
       vectorTransfer = vector.getTransferPair();
       this.to.vector = vectorTransfer.getTo();
     }
@@ -437,8 +440,11 @@ public class RepeatedListVector extends AbstractContainerVector implements
Repea
   public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T>
clazz) {
     Preconditions.checkArgument(name == null);
 
-    if (vector == null) {
-      vector = TypeHelper.getNewVector(MaterializedField.create(field.getPath().getUnindexedArrayChild(),
type), allocator);
+    if(vector == null){
+      vector = TypeHelper.getNewVector(MaterializedField.create(field.getPath().getUnindexedArrayChild(),
type), allocator, callBack);
+      if (callBack != null) {
+        callBack.doWork();
+      }
     }
     return typeify(vector, clazz);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/61053a8c/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index beb2475..d73aa7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.util.CallBack;
 import org.apache.drill.exec.util.JsonStringArrayList;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.BaseDataValueVector;
@@ -68,12 +69,14 @@ public class RepeatedMapVector extends AbstractContainerVector implements
Repeat
   private final BufferAllocator allocator;
   private final MaterializedField field;
   private int lastPopulatedValueIndex = -1;
+  private int lastSet = -1;
+  private CallBack callBack;
 
-  public RepeatedMapVector(MaterializedField field, BufferAllocator allocator) {
+  public RepeatedMapVector(MaterializedField field, BufferAllocator allocator, CallBack callBack){
     this.field = field;
     this.allocator = allocator;
     this.offsets = new UInt4Vector(null, allocator);
-
+    this.callBack = callBack;
   }
 
   @Override
@@ -122,6 +125,9 @@ public class RepeatedMapVector extends AbstractContainerVector implements
Repeat
       v = TypeHelper.getNewVector(field.getPath(), name, allocator, type);
       Preconditions.checkNotNull(v, String.format("Failure to create vector of type %s.",
type));
       put(name, v);
+      if (callBack != null) {
+        callBack.doWork();
+      }
     }
     return typeify(v, clazz);
   }
@@ -243,7 +249,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements
Repeat
     public SingleMapTransferPair(SchemaPath path) {
 
       MaterializedField mf = MaterializedField.create(path, Types.required(field.getType().getMinorType()));
-      MapVector v = new MapVector(mf, allocator);
+      MapVector v = new MapVector(mf, allocator, callBack);
       pairs = new TransferPair[vectors.size()];
       int i =0;
       for (Map.Entry<String, ValueVector> e : vectors.entrySet()) {
@@ -310,7 +316,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements
Repeat
     private final RepeatedMapVector from = RepeatedMapVector.this;
 
     public MapTransferPair(SchemaPath path) {
-      RepeatedMapVector v = new RepeatedMapVector(MaterializedField.create(path, TYPE), allocator);
+      RepeatedMapVector v = new RepeatedMapVector(MaterializedField.create(path, TYPE), allocator,
callBack);
       pairs = new TransferPair[vectors.size()];
       int i =0;
       for (Map.Entry<String, ValueVector> e : vectors.entrySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/61053a8c/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
index 920a4f3..18b5e9e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
@@ -183,7 +183,7 @@ public class ComplexWriterImpl extends AbstractFieldWriter implements
ComplexWri
     private final VectorContainer vc;
 
     public VectorAccessibleFacade(VectorContainer vc) {
-      super("", null);
+      super("", null, null);
       this.vc = vc;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/61053a8c/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
index 417d3ef..6b8a523 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
@@ -91,7 +91,7 @@ public class VectorContainerWriter extends AbstractFieldWriter implements
Comple
   private class SpecialMapVector extends MapVector {
 
     public SpecialMapVector() {
-      super("", null);
+      super("", null, null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/61053a8c/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
index f207bba..bf81ba2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
@@ -290,7 +290,7 @@ public class TestJsonReader extends BaseTestQuery {
 //        "{ \"integer\" : -2002,\n" +
 //        "  \"float\"   : -1.2 \n" +
 //        "}";
-    MapVector v = new MapVector("", allocator);
+    MapVector v = new MapVector("", allocator, null);
     ComplexWriterImpl writer = new ComplexWriterImpl("col", v);
     writer.allocate();
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/61053a8c/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java
index a85a841..3f125fa 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java
@@ -130,7 +130,7 @@ public class TestRepeated {
      *
      */
 
-    MapVector v = new MapVector("", allocator);
+    MapVector v = new MapVector("", allocator, null);
     ComplexWriterImpl writer = new ComplexWriterImpl("col", v);
     writer.allocate();
 


Mime
View raw message