drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h.@apache.org
Subject [1/2] drill git commit: DRILL-2150: Create an abstraction for repeated value vectors.
Date Mon, 11 May 2015 08:18:01 GMT
Repository: drill
Updated Branches:
  refs/heads/master a3ec52a72 -> 4689468ef


http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index e4a0997..ab9992e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -37,12 +37,11 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader;
  * A vector when instantiated, relies on a {@link org.apache.drill.exec.record.DeadBuf dead
buffer}. It is important
  * that vector is allocated before attempting to read or write.
  *
- * @param <V>  actual value vector type
  * @param <A>  accessor type that supports reading from this vector
  * @param <M>  mutator type that supports writing to this vector
  */
-public interface ValueVector<V extends ValueVector, A extends ValueVector.Accessor, M
extends ValueVector.Mutator>
-    extends Closeable, Iterable<ValueVector<V, A, M>> {
+public interface ValueVector<A extends ValueVector.Accessor, M extends ValueVector.Mutator>
+    extends Closeable, Iterable<ValueVector> {
 
   /**
    * Allocate new buffers. ValueVector implements logic to determine how much to allocate.
@@ -94,7 +93,7 @@ public interface ValueVector<V extends ValueVector, A extends ValueVector.Access
    * Returns a new {@link org.apache.drill.exec.record.TransferPair transfer pair} that is
used to transfer underlying
    * buffers into the target vector.
    */
-  TransferPair makeTransferPair(V target);
+  TransferPair makeTransferPair(ValueVector target);
 
   /**
    * Returns an {@link org.apache.drill.exec.vector.ValueVector.Accessor accessor} that is
used to read from this vector

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VectorDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VectorDescriptor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VectorDescriptor.java
new file mode 100644
index 0000000..9a29848
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VectorDescriptor.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.types.TypeProtos;
+
+public class VectorDescriptor {
+  private static final String DEFAULT_NAME = new String("NONE");
+
+  private final TypeProtos.MajorType type;
+  private final String name;
+
+  public VectorDescriptor(TypeProtos.MajorType type) {
+    this(DEFAULT_NAME, type);
+  }
+
+  public VectorDescriptor(String name,TypeProtos.MajorType type) {
+    this.name = Preconditions.checkNotNull(name, "name cannot be null");
+    this.type = Preconditions.checkNotNull(type, "type cannot be null");
+  }
+
+  public TypeProtos.MajorType getType() {
+    return type;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public boolean hasName() {
+    return name != DEFAULT_NAME;
+  }
+
+  public static VectorDescriptor create(String name, TypeProtos.MajorType type) {
+    return new VectorDescriptor(name, type);
+  }
+
+  public static VectorDescriptor create(TypeProtos.MajorType type) {
+    return new VectorDescriptor(type);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ZeroVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
new file mode 100644
index 0000000..db8d327
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+import java.util.Iterator;
+
+import com.google.common.collect.Iterators;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.complex.impl.NullReader;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+
+public class ZeroVector implements ValueVector {
+  public final static ZeroVector INSTANCE = new ZeroVector();
+
+  private final MaterializedField field = MaterializedField.create("[DEFAULT]", Types.LATE_BIND_TYPE);
+
+  private final TransferPair defaultPair = new TransferPair() {
+    @Override
+    public void transfer() { }
+
+    @Override
+    public void splitAndTransfer(int startIndex, int length) { }
+
+    @Override
+    public ValueVector getTo() {
+      return ZeroVector.this;
+    }
+
+    @Override
+    public void copyValueSafe(int from, int to) { }
+  };
+
+  private final Accessor defaultAccessor = new Accessor() {
+    @Override
+    public Object getObject(int index) {
+      return null;
+    }
+
+    @Override
+    public int getValueCount() {
+      return 0;
+    }
+
+    @Override
+    public boolean isNull(int index) {
+      return true;
+    }
+  };
+
+  private final Mutator defaultMutator = new Mutator() {
+    @Override
+    public void setValueCount(int valueCount) { }
+
+    @Override
+    public void reset() { }
+
+    @Override
+    public void generateTestData(int values) { }
+  };
+
+  public ZeroVector() { }
+
+  @Override
+  public void close() { }
+
+  @Override
+  public void clear() { }
+
+  @Override
+  public MaterializedField getField() {
+    return field;
+  }
+
+  @Override
+  public TransferPair getTransferPair() {
+    return defaultPair;
+  }
+
+  @Override
+  public UserBitShared.SerializedField getMetadata() {
+    return getField()
+        .getAsBuilder()
+        .setBufferLength(getBufferSize())
+        .setValueCount(getAccessor().getValueCount())
+        .build();
+  }
+
+  @Override
+  public Iterator iterator() {
+    return Iterators.emptyIterator();
+  }
+
+  @Override
+  public int getBufferSize() {
+    return 0;
+  }
+
+  @Override
+  public DrillBuf[] getBuffers(boolean clear) {
+    return new DrillBuf[0];
+  }
+
+  @Override
+  public void allocateNew() throws OutOfMemoryRuntimeException {
+    allocateNewSafe();
+  }
+
+  @Override
+  public boolean allocateNewSafe() {
+    return true;
+  }
+
+  @Override
+  public void setInitialCapacity(int numRecords) { }
+
+  @Override
+  public int getValueCapacity() {
+    return 0;
+  }
+
+  @Override
+  public TransferPair getTransferPair(FieldReference ref) {
+    return defaultPair;
+  }
+
+  @Override
+  public TransferPair makeTransferPair(ValueVector target) {
+    return defaultPair;
+  }
+
+  @Override
+  public Accessor getAccessor() {
+    return defaultAccessor;
+  }
+
+  @Override
+  public Mutator getMutator() {
+    return defaultMutator;
+  }
+
+  @Override
+  public FieldReader getReader() {
+    return NullReader.INSTANCE;
+  }
+
+  @Override
+  public void load(UserBitShared.SerializedField metadata, DrillBuf buffer) { }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
index 4138839..b615b66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -101,7 +101,8 @@ public class MapVector extends AbstractMapVector {
 
   @Override
   public void setInitialCapacity(int numRecords) {
-    for (ValueVector v : (ValueVector<?,?,?>)this) {
+    final Iterable<ValueVector> container = this;
+    for (ValueVector v : container) {
       v.setInitialCapacity(numRecords);
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index c061029..b5de8b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -17,13 +17,12 @@
  */
 package org.apache.drill.exec.vector.complex;
 
+import com.google.common.base.Preconditions;
 import io.netty.buffer.DrillBuf;
 
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.commons.lang3.ArrayUtils;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MajorType;
@@ -33,380 +32,391 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.holders.ComplexHolder;
 import org.apache.drill.exec.expr.holders.RepeatedListHolder;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.util.JsonStringArrayList;
-import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.drill.exec.vector.AddOrGetResult;
+import org.apache.drill.exec.vector.BaseRepeatedValueVector;
+import org.apache.drill.exec.vector.RepeatedFixedWidthVectorLike;
 import org.apache.drill.exec.util.CallBack;
+import org.apache.drill.exec.vector.RepeatedValueVector;
 import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VectorDescriptor;
 import org.apache.drill.exec.vector.complex.impl.NullReader;
 import org.apache.drill.exec.vector.complex.impl.RepeatedListReaderImpl;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
-import com.google.common.base.Preconditions;
-
 
-public class RepeatedListVector extends AbstractContainerVector implements RepeatedFixedWidthVector{
+public class RepeatedListVector extends AbstractContainerVector
+    implements RepeatedValueVector, RepeatedFixedWidthVectorLike {
 
   public final static MajorType TYPE = Types.repeated(MinorType.LIST);
-
-  private final UInt4Vector offsets;   // offsets to start of each record
-  private final Mutator mutator = new Mutator();
-  private final RepeatedListAccessor accessor = new RepeatedListAccessor();
-  private ValueVector vector;
   private final RepeatedListReaderImpl reader = new RepeatedListReaderImpl(null, this);
-  private final EmptyValuePopulator emptyPopulator;
+  private final DelegateRepeatedVector delegate;
+
+  protected static class DelegateRepeatedVector
+      extends BaseRepeatedValueVector<DelegateRepeatedVector.RepeatedListAccessor, DelegateRepeatedVector.RepeatedListMutator>
{
+
+    private final RepeatedListAccessor accessor = new RepeatedListAccessor();
+    private final RepeatedListMutator mutator = new RepeatedListMutator();
+    private final EmptyValuePopulator emptyPopulator;
+    private transient DelegateTransferPair ephPair;
+
+    public class RepeatedListAccessor extends BaseRepeatedValueVector.BaseRepeatedAccessor
{
+
+      @Override
+      public Object getObject(int index) {
+        List<Object> list = new JsonStringArrayList();
+        final int start = offsets.getAccessor().get(index);
+        final int until = offsets.getAccessor().get(index+1);
+        for (int i = start; i < until; i++) {
+          list.add(vector.getAccessor().getObject(i));
+        }
+        return list;
+      }
 
+      public void get(int index, RepeatedListHolder holder) {
+        assert index <= getValueCapacity();
+        holder.start = getOffsetVector().getAccessor().get(index);
+        holder.end = getOffsetVector().getAccessor().get(index+1);
+      }
 
-  public RepeatedListVector(SchemaPath path, BufferAllocator allocator, CallBack callBack){
-    this(MaterializedField.create(path, TYPE), allocator, callBack);
-  }
+      public void get(int index, ComplexHolder holder) {
+        final FieldReader reader = getReader();
+        reader.setPosition(index);
+        holder.reader = reader;
+      }
 
-  public RepeatedListVector(MaterializedField field, BufferAllocator allocator, CallBack
callBack){
-    super(field, allocator, callBack);
-    int childrenSize = field.getChildren().size();
+      public void get(int index, int arrayIndex, ComplexHolder holder) {
+        final RepeatedListHolder listHolder = new RepeatedListHolder();
+        get(index, listHolder);
+        int offset = listHolder.start + arrayIndex;
+        if (offset >= listHolder.end) {
+          holder.reader = NullReader.INSTANCE;
+        } else {
+          FieldReader r = getDataVector().getReader();
+          r.setPosition(offset);
+          holder.reader = r;
+        }
+      }
+    }
 
-    // repeated list vector should not have more than one child
-    assert childrenSize <= 1;
+    public class RepeatedListMutator extends BaseRepeatedValueVector.BaseRepeatedMutator
{
 
-    if (childrenSize > 0) {
-      MaterializedField child = field.getChildren().iterator().next();
-      setVector(TypeHelper.getNewVector(child, allocator, callBack));
-    }
+      public int add(int index) {
+        final int curEnd = getOffsetVector().getAccessor().get(index+1);
+        getOffsetVector().getMutator().setSafe(index + 1, curEnd + 1);
+        return curEnd;
+      }
 
-    this.offsets = new UInt4Vector(null, allocator);
-    this.emptyPopulator = new EmptyValuePopulator(offsets);
-  }
+      @Override
+      public void startNewValue(int index) {
+        emptyPopulator.populate(index+1);
+        super.startNewValue(index);
+      }
 
-  @Override
-  public RepeatedListReaderImpl getReader() {
-    return reader;
-  }
+      @Override
+      public void setValueCount(int valueCount) {
+        emptyPopulator.populate(valueCount);
+        super.setValueCount(valueCount);
+      }
+    }
 
-  @Override
-  public int size() {
-    return vector != null ? 1 : 0;
-  }
 
+    public class DelegateTransferPair implements TransferPair {
+      private final DelegateRepeatedVector target;
+      private final TransferPair[] children;
 
-  transient private RepeatedListTransferPair ephPair;
+      public DelegateTransferPair(DelegateRepeatedVector target) {
+        this.target = Preconditions.checkNotNull(target);
+        if (target.getDataVector() == DEFAULT_DATA_VECTOR) {
+          target.addOrGetVector(VectorDescriptor.create(getDataVector().getField().getType()));
+          target.getDataVector().allocateNew();
+        }
+        this.children = new TransferPair[] {
+            getOffsetVector().makeTransferPair(target.getOffsetVector()),
+            getDataVector().makeTransferPair(target.getDataVector())
+        };
+      }
 
-  public void copyFromSafe(int fromIndex, int thisIndex, RepeatedListVector from) {
-    if(ephPair == null || ephPair.from != from) {
-      ephPair = (RepeatedListTransferPair) from.makeTransferPair(this);
-    }
-    ephPair.copyValueSafe(fromIndex, thisIndex);
-  }
+      @Override
+      public void transfer() {
+        for (TransferPair child:children) {
+          child.transfer();
+        }
+      }
 
-  public Mutator getMutator() {
-    return mutator;
-  }
+      @Override
+      public ValueVector getTo() {
+        return target;
+      }
 
-  public void setInitialCapacity(int numRecords) {
-    offsets.setInitialCapacity(numRecords + 1);
-    if (vector != null) {
-      vector.setInitialCapacity(numRecords * DEFAULT_REPEAT_PER_RECORD);
-    }
-  }
+      @Override
+      public void splitAndTransfer(int startIndex, int length) {
+        throw new UnsupportedOperationException("Repeated list does not support split &
transfer operation");
+      }
 
-  @Override
-  public boolean allocateNewSafe() {
-    if (!offsets.allocateNewSafe()) {
-      return false;
+      @Override
+      public void copyValueSafe(int srcIndex, int destIndex) {
+        final RepeatedListHolder holder = new RepeatedListHolder();
+        getAccessor().get(srcIndex, holder);
+        target.emptyPopulator.populate(destIndex+1);
+        final TransferPair vectorTransfer = children[1];
+        int newIndex = target.getOffsetVector().getAccessor().get(destIndex);
+        //todo: make this a bulk copy.
+        for (int i = holder.start; i < holder.end; i++, newIndex++) {
+          vectorTransfer.copyValueSafe(i, newIndex);
+        }
+        target.getOffsetVector().getMutator().setSafe(destIndex + 1, newIndex);
+      }
     }
-    offsets.zeroVector();
 
-    if (vector != null) {
-      return vector.allocateNewSafe();
-    } else {
-      return true;
+    public DelegateRepeatedVector(SchemaPath path, BufferAllocator allocator) {
+      this(MaterializedField.create(path, TYPE), allocator);
     }
-  }
 
-  public void reAlloc() {
-    offsets.reAlloc();
-  }
-
-  public class Mutator implements ValueVector.Mutator, RepeatedMutator{
+    public DelegateRepeatedVector(MaterializedField field, BufferAllocator allocator) {
+      super(field, allocator);
+      this.emptyPopulator = new EmptyValuePopulator(getOffsetVector());
+    }
 
-    public void startNewGroup(int index) {
-      emptyPopulator.populate(index+1);
-      offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index));
+    @Override
+    public void allocateNew() throws OutOfMemoryRuntimeException {
+      if (!allocateNewSafe()) {
+        throw new OutOfMemoryRuntimeException();
+      }
     }
 
-    public int add(int index) {
-      final int prevEnd = offsets.getAccessor().get(index+1);
-      offsets.getMutator().setSafe(index+1, prevEnd+1);
-      return prevEnd;
+    @Override
+    protected SerializedField.Builder getMetadataBuilder() {
+      return super.getMetadataBuilder();
     }
 
-    public void setValueCount(int groupCount) {
-      emptyPopulator.populate(groupCount);
-      offsets.getMutator().setValueCount(groupCount+1);
+    @Override
+    public TransferPair getTransferPair(FieldReference ref) {
+      return makeTransferPair(new DelegateRepeatedVector(ref, allocator));
+    }
 
-      if (vector != null) {
-        int valueCount = offsets.getAccessor().get(groupCount);
-        vector.getMutator().setValueCount(valueCount);
-      }
+    @Override
+    public TransferPair makeTransferPair(ValueVector target) {
+      return new DelegateTransferPair(DelegateRepeatedVector.class.cast(target));
     }
 
     @Override
-    public void reset() { }
+    public RepeatedListAccessor getAccessor() {
+      return accessor;
+    }
 
     @Override
-    public void generateTestData(int values) {
+    public RepeatedListMutator getMutator() {
+      return mutator;
     }
 
     @Override
-    public void setValueCounts(int parentValueCount, int childValueCount) {
-      // TODO - determine if this should be implemented for this class
+    public FieldReader getReader() {
       throw new UnsupportedOperationException();
     }
 
     @Override
-    public void setRepetitionAtIndexSafe(int index, int repetitionCount) {
+    public void load(SerializedField metadata, DrillBuf buffer) {
+      //TODO(DRILL-2997): get rid of the notion of "group count" completely
+      final int valueCount = metadata.getGroupCount();
+      final int bufOffset = offsets.load(valueCount + 1, buffer);
+      final SerializedField childField = metadata.getChildList().get(0);
+      if (getDataVector() == DEFAULT_DATA_VECTOR) {
+        addOrGetVector(VectorDescriptor.create(childField.getMajorType()));
+      }
+
+      if (childField.getValueCount() == 0) {
+        vector.clear();
+      } else {
+        vector.load(childField, buffer.slice(bufOffset, childField.getBufferLength()));
+      }
     }
 
-    @Override
-    public BaseDataValueVector getDataVector() {
-      return null;  //To change body of implemented methods use File | Settings | File Templates.
+    public void copyFromSafe(int fromIndex, int thisIndex, DelegateRepeatedVector from) {
+      if(ephPair == null || ephPair.target != from) {
+        ephPair = DelegateTransferPair.class.cast(from.makeTransferPair(this));
+      }
+      ephPair.copyValueSafe(fromIndex, thisIndex);
     }
+
   }
 
-  public class RepeatedListAccessor implements RepeatedAccessor{
+  protected class RepeatedListTransferPair implements TransferPair {
+    private final TransferPair delegate;
 
-    @Override
-    public Object getObject(int index) {
-      List<Object> l = new JsonStringArrayList();
-      int end = offsets.getAccessor().get(index+1);
-      for (int i =  offsets.getAccessor().get(index); i < end; i++) {
-        l.add(vector.getAccessor().getObject(i));
-      }
-      return l;
+    public RepeatedListTransferPair(TransferPair delegate) {
+      this.delegate = delegate;
     }
 
-    public int getGroupSizeAtIndex(int index) {
-      return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index);
+    public void transfer() {
+      delegate.transfer();
     }
 
     @Override
-    public ValueVector getAllChildValues() {
-      return vector;
+    public void splitAndTransfer(int startIndex, int length) {
+      delegate.splitAndTransfer(startIndex, length);
     }
 
     @Override
-    public int getValueCount() {
-      return offsets.getAccessor().get(offsets.getAccessor().getValueCount() - 1);
+    public ValueVector getTo() {
+      final DelegateRepeatedVector delegateVector = DelegateRepeatedVector.class.cast(delegate.getTo());
+      return new RepeatedListVector(getField(), allocator, callBack, delegateVector);
     }
 
-    public void get(int index, RepeatedListHolder holder) {
-      assert index <= getValueCapacity();
-      holder.start = offsets.getAccessor().get(index);
-      holder.end = offsets.getAccessor().get(index+1);
+    @Override
+    public void copyValueSafe(int from, int to) {
+      delegate.copyValueSafe(from, to);
     }
+  }
 
-    public void get(int index, ComplexHolder holder) {
-      FieldReader reader = getReader();
-      reader.setPosition(index);
-      holder.reader = reader;
-    }
+  public RepeatedListVector(SchemaPath path, BufferAllocator allocator, CallBack callBack)
{
+    this(MaterializedField.create(path, TYPE), allocator, callBack);
+  }
 
-    public void get(int index, int arrayIndex, ComplexHolder holder) {
-      RepeatedListHolder h = new RepeatedListHolder();
-      get(index, h);
-      int offset = h.start + arrayIndex;
+  public RepeatedListVector(MaterializedField field, BufferAllocator allocator, CallBack
callBack) {
+    this(field, allocator, callBack, new DelegateRepeatedVector(field, allocator));
+  }
 
-      if (offset >= h.end) {
-        holder.reader = NullReader.INSTANCE;
-      } else {
-        FieldReader r = vector.getReader();
-        r.setPosition(offset);
-        holder.reader = r;
-      }
-    }
+  protected RepeatedListVector(MaterializedField field, BufferAllocator allocator, CallBack
callBack, DelegateRepeatedVector delegate) {
+    super(field, allocator, callBack);
+    int childrenSize = field.getChildren().size();
 
-    @Override
-    public boolean isNull(int index) {
-      return false;
+    // repeated list vector should not have more than one child
+    assert childrenSize <= 1;
+    this.delegate = Preconditions.checkNotNull(delegate);
+    if (childrenSize > 0) {
+      MaterializedField child = field.getChildren().iterator().next();
+      addOrGetVector(VectorDescriptor.create(child.getType()));
+//      setVector(TypeHelper.getNewVector(child, allocator, callBack));
     }
+  }
+
 
     @Override
-    public int getGroupCount() {
-      final int valueCount = offsets.getAccessor().getValueCount();
-      return valueCount == 0 ? 0 : valueCount - 1;
-    }
+  public RepeatedListReaderImpl getReader() {
+    return reader;
   }
 
   @Override
-  public int getBufferSize() {
-    return offsets.getBufferSize() + vector.getBufferSize();
+  public DelegateRepeatedVector.RepeatedListAccessor getAccessor() {
+    return delegate.getAccessor();
   }
 
   @Override
-  public void close() {
-    offsets.close();
-    super.close();
+  public DelegateRepeatedVector.RepeatedListMutator getMutator() {
+    return delegate.getMutator();
   }
 
   @Override
-  public void clear() {
-    getMutator().reset();
-    offsets.clear();
-    if (vector != null) {
-      vector.clear();
-    }
+  public UInt4Vector getOffsetVector() {
+    return delegate.getOffsetVector();
   }
 
   @Override
-  public TransferPair getTransferPair() {
-    return new RepeatedListTransferPair(getField().getPath());
+  public ValueVector getDataVector() {
+    return delegate.getDataVector();
   }
 
-  public class RepeatedListTransferPair implements TransferPair{
-    private final RepeatedListVector from = RepeatedListVector.this;
-    private final RepeatedListVector to;
-    private final TransferPair vectorTransfer;
-
-    private RepeatedListTransferPair(RepeatedListVector to) {
-      this.to = to;
-      if (to.vector == null) {
-        to.vector = to.addOrGet(null, vector.getField().getType(), vector.getClass());
-        to.vector.allocateNew();
-      }
-      this.vectorTransfer = vector.makeTransferPair(to.vector);
-    }
+  @Override
+  public void allocateNew() throws OutOfMemoryRuntimeException {
+    delegate.allocateNew();
+  }
 
-    private RepeatedListTransferPair(SchemaPath path) {
-      this.to = new RepeatedListVector(path, allocator, callBack);
-      vectorTransfer = vector.getTransferPair();
-      this.to.vector = vectorTransfer.getTo();
-    }
+  @Override
+  public boolean allocateNewSafe() {
+    return delegate.allocateNewSafe();
+  }
 
-    @Override
-    public void transfer() {
-      offsets.transferTo(to.offsets);
-      vectorTransfer.transfer();
-      clear();
+  @Override
+  public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(VectorDescriptor
descriptor) {
+    final AddOrGetResult<T> result = delegate.addOrGetVector(descriptor);
+    if (result.isCreated() && callBack != null) {
+      callBack.doWork();
     }
+    return result;
+  }
 
-    @Override
-    public ValueVector getTo() {
-      return to;
-    }
+  @Override
+  public int size() {
+    return delegate.size();
+  }
 
-    @Override
-    public void splitAndTransfer(int startIndex, int length) {
-      throw new UnsupportedOperationException();
-    }
+  @Override
+  public int getBufferSize() {
+    return delegate.getBufferSize();
+  }
 
-    @Override
-    public void copyValueSafe(int srcIndex, int destIndex) {
-      RepeatedListHolder holder = new RepeatedListHolder();
-      accessor.get(srcIndex, holder);
-      to.emptyPopulator.populate(destIndex+1);
-      int newIndex = to.offsets.getAccessor().get(destIndex);
-      //todo: make this a bulk copy.
-      for (int i = holder.start; i < holder.end; i++, newIndex++) {
-        vectorTransfer.copyValueSafe(i, newIndex);
-      }
-      to.offsets.getMutator().setSafe(destIndex + 1, newIndex);
-    }
+  @Override
+  public void close() {
+    delegate.close();
+  }
 
+  @Override
+  public void clear() {
+    delegate.clear();
   }
 
   @Override
-  public TransferPair makeTransferPair(ValueVector to) {
-    if (!(to instanceof RepeatedListVector ) ) {
-      throw new IllegalArgumentException("You can't make a transfer pair from an incompatible
.");
-    }
-    return new RepeatedListTransferPair( (RepeatedListVector) to);
+  public TransferPair getTransferPair() {
+    return new RepeatedListTransferPair(delegate.getTransferPair());
   }
 
   @Override
   public TransferPair getTransferPair(FieldReference ref) {
-    return new RepeatedListTransferPair(ref);
+    return new RepeatedListTransferPair(delegate.getTransferPair(ref));
   }
 
   @Override
-  public int getValueCapacity() {
-    if (vector == null) {
-      return offsets.getValueCapacity() - 1;
-    }
-    return  Math.min(offsets.getValueCapacity() - 1, vector.getValueCapacity());
+  public TransferPair makeTransferPair(ValueVector to) {
+    final RepeatedListVector target = RepeatedListVector.class.cast(to);
+    return new RepeatedListTransferPair(delegate.makeTransferPair(target.delegate));
   }
 
   @Override
-  public RepeatedListAccessor getAccessor() {
-    return accessor;
+  public int getValueCapacity() {
+    return delegate.getValueCapacity();
   }
 
   @Override
   public DrillBuf[] getBuffers(boolean clear) {
-    DrillBuf[] buffers = ArrayUtils.addAll(offsets.getBuffers(false), vector.getBuffers(false));
-    if (clear) {
-      // does not make much sense but we have to retain buffers even when clear is set. refactor
this interface.
-      for (DrillBuf buffer:buffers) {
-        buffer.retain();
-      }
-      clear();
-    }
-    return buffers;
+    return delegate.getBuffers(clear);
   }
 
-  protected void setVector(ValueVector newVector) {
-    vector = Preconditions.checkNotNull(newVector);
-    getField().addChild(newVector.getField());
-  }
 
   @Override
   public void load(SerializedField metadata, DrillBuf buf) {
-    SerializedField childField = metadata.getChildList().get(0);
-
-    int bufOffset = offsets.load(metadata.getValueCount()+1, buf);
-
-    MaterializedField fieldDef = MaterializedField.create(childField);
-    if (vector == null) {
-      setVector(TypeHelper.getNewVector(fieldDef, allocator));
-    }
-
-    if (childField.getValueCount() == 0) {
-      vector.clear();
-    } else {
-      vector.load(childField, buf.slice(bufOffset, childField.getBufferLength()));
-    }
+    delegate.load(metadata, buf);
   }
 
   @Override
   public SerializedField getMetadata() {
-    return getField() //
-        .getAsBuilder() //
-        .setBufferLength(getBufferSize()) //
-        .setValueCount(accessor.getGroupCount()) //
-        .addChild(vector.getMetadata()) //
-        .build();
+    return delegate.getMetadata();
   }
 
   @Override
   public Iterator<ValueVector> iterator() {
-    return Collections.singleton(vector).iterator();
+    return delegate.iterator();
+  }
+
+  @Override
+  public void setInitialCapacity(int numRecords) {
+    delegate.setInitialCapacity(numRecords);
   }
 
+  /**
+   * @deprecated
+   *   prefer using {@link #addOrGetVector(org.apache.drill.exec.vector.VectorDescriptor)}
instead.
+   */
   @Override
   public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T>
clazz) {
-    Preconditions.checkArgument(name == null);
-
-    if(vector == null){
-      final MaterializedField child =  MaterializedField.create(getField().getPath().getUnindexedArrayChild(),
type);
-      vector = TypeHelper.getNewVector(child, allocator, callBack);
-      setVector(vector);
-      if (callBack != null) {
-        callBack.doWork();
-      }
-    }
-    return typeify(vector, clazz);
+    final AddOrGetResult<T> result = addOrGetVector(VectorDescriptor.create(type));
+    return result.getVector();
   }
 
   @Override
@@ -414,18 +424,18 @@ public class RepeatedListVector extends AbstractContainerVector implements
Repea
     if (name != null) {
       return null;
     }
-    return typeify(vector, clazz);
+    return typeify(delegate.getDataVector(), clazz);
   }
 
   @Override
-  public void allocateNew(int parentValueCount, int childValueCount) {
+  public void allocateNew(int valueCount, int innerValueCount) {
     clear();
-    offsets.allocateNew(parentValueCount + 1);
-    mutator.reset();
+    getOffsetVector().allocateNew(valueCount + 1);
+    getMutator().reset();
   }
 
   @Override
-  public int load(int parentValueCount, int childValueCount, DrillBuf buf) {
+  public int load(int valueCount, int innerValueCount, DrillBuf buf) {
     throw new UnsupportedOperationException();
   }
 
@@ -434,7 +444,18 @@ public class RepeatedListVector extends AbstractContainerVector implements
Repea
     if (name != null) {
       return null;
     }
-    return new VectorWithOrdinal(vector, 0);
+    return new VectorWithOrdinal(delegate.getDataVector(), 0);
   }
 
+
+  public void copyFromSafe(int fromIndex, int thisIndex, RepeatedListVector from) {
+    delegate.copyFromSafe(fromIndex, thisIndex, from.delegate);
+  }
+
+
+//  protected void setVector(ValueVector newVector) {
+//    vector = Preconditions.checkNotNull(newVector);
+//    getField().addChild(newVector.getField());
+//  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index e5d48dd..a97847b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -41,11 +41,15 @@ import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.util.CallBack;
 import org.apache.drill.exec.util.JsonStringArrayList;
+import org.apache.drill.exec.vector.AddOrGetResult;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.drill.exec.vector.BaseRepeatedValueVector;
+import org.apache.drill.exec.vector.RepeatedFixedWidthVectorLike;
+import org.apache.drill.exec.vector.RepeatedValueVector;
 import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VectorDescriptor;
 import org.apache.drill.exec.vector.complex.impl.NullReader;
 import org.apache.drill.exec.vector.complex.impl.RepeatedMapReaderImpl;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
@@ -53,7 +57,7 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
-public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixedWidthVector
{
+public class RepeatedMapVector extends AbstractMapVector implements RepeatedValueVector,
RepeatedFixedWidthVectorLike {
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RepeatedMapVector.class);
 
@@ -71,10 +75,27 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe
     this.emptyPopulator = new EmptyValuePopulator(offsets);
   }
 
+  @Override
+  public UInt4Vector getOffsetVector() {
+    return offsets;
+  }
+
+  @Override
+  public ValueVector getDataVector() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(VectorDescriptor
descriptor) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public void setInitialCapacity(int numRecords) {
     offsets.setInitialCapacity(numRecords + 1);
-    for(ValueVector v : (ValueVector<?,?,?>)this) {
-      v.setInitialCapacity(numRecords * DEFAULT_REPEAT_PER_RECORD);
+    final Iterable<ValueVector> container = this;
+    for(ValueVector v : container) {
+      v.setInitialCapacity(numRecords * RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD);
     }
   }
 
@@ -84,20 +105,16 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe
   }
 
   @Override
-  public void allocateNew(int groupCount, int valueCount) {
+  public void allocateNew(int groupCount, int innerValueCount) {
     clear();
     offsets.allocateNew(groupCount+1);
     offsets.zeroVector();
     for (ValueVector v : getChildren()) {
-      AllocationHelper.allocatePrecomputedChildCount(v, groupCount, 50, valueCount);
+      AllocationHelper.allocatePrecomputedChildCount(v, groupCount, 50, innerValueCount);
     }
     mutator.reset();
   }
 
-  public void reAlloc() {
-    offsets.reAlloc();
-  }
-
   public Iterator<String> fieldNameIterator() {
     return getChildFieldNames().iterator();
   }
@@ -111,7 +128,7 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe
 
   @Override
   public int getBufferSize() {
-    if (accessor.getGroupCount() == 0) {
+    if (getAccessor().getValueCount() == 0) {
       return 0;
     }
     long buffer = offsets.getBufferSize();
@@ -425,14 +442,15 @@ public class RepeatedMapVector extends AbstractMapVector implements
RepeatedFixe
     Preconditions.checkArgument(bufOffset == buf.capacity());
   }
 
+
   @Override
   public SerializedField getMetadata() {
     SerializedField.Builder b = getField() //
         .getAsBuilder() //
         .setBufferLength(getBufferSize()) //
-        .setGroupCount(accessor.getGroupCount())
+        .setGroupCount(accessor.getValueCount())
         // while we don't need to actually read this on load, we need it to make sure we
don't skip deserialization of this vector
-        .setValueCount(accessor.getGroupCount());
+        .setValueCount(accessor.getInnerValueCount());
     for (ValueVector v : getChildren()) {
       b.addChild(v.getMetadata());
     }
@@ -467,16 +485,31 @@ public class RepeatedMapVector extends AbstractMapVector implements
RepeatedFixe
 
     @Override
     public int getValueCount() {
-      return offsets.getAccessor().get(offsets.getAccessor().getValueCount() - 1);
+      return Math.max(offsets.getAccessor().getValueCount() - 1, 0);
     }
 
-    public int getGroupSizeAtIndex(int index) {
+    @Override
+    public int getInnerValueCount() {
+      final int valueCount = getValueCount();
+      if (valueCount == 0) {
+        return 0;
+      }
+      return offsets.getAccessor().get(valueCount);
+    }
+
+    @Override
+    public int getInnerValueCountAt(int index) {
       return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index);
     }
 
     @Override
-    public ValueVector getAllChildValues() {
-      throw new UnsupportedOperationException("Cannot retrieve inner vector from repeated
map.");
+    public boolean isEmpty(int index) {
+      return false;
+    }
+
+    @Override
+    public boolean isNull(int index) {
+      return false;
     }
 
     public void get(int index, RepeatedMapHolder holder) {
@@ -504,32 +537,18 @@ public class RepeatedMapVector extends AbstractMapVector implements
RepeatedFixe
       }
     }
 
-    @Override
-    public boolean isNull(int index) {
-      return false;
-    }
-
-    @Override
-    public int getGroupCount() {
-      final int valueCount = offsets.getAccessor().getValueCount();
-      return valueCount == 0 ? 0 : valueCount - 1;
-    }
   }
 
 
-  public class Mutator implements ValueVector.Mutator, RepeatedMutator {
+  public class Mutator implements RepeatedMutator {
 
-    public void startNewGroup(int index) {
+    @Override
+    public void startNewValue(int index) {
       emptyPopulator.populate(index+1);
       offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index));
     }
 
-    public int add(int index) {
-      final int prevEnd = offsets.getAccessor().get(index+1);
-      offsets.getMutator().setSafe(index + 1, prevEnd + 1);
-      return prevEnd;
-    }
-
+    @Override
     public void setValueCount(int topLevelValueCount) {
       emptyPopulator.populate(topLevelValueCount);
       offsets.getMutator().setValueCount(topLevelValueCount == 0 ? 0 : topLevelValueCount+1);
@@ -543,22 +562,12 @@ public class RepeatedMapVector extends AbstractMapVector implements
RepeatedFixe
     public void reset() { }
 
     @Override
-    public void generateTestData(int values) {
-    }
+    public void generateTestData(int values) { }
 
-    @Override
-    public void setValueCounts(int parentValueCount, int childValueCount) {
-      // TODO - determine if this should be implemented for this class
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void setRepetitionAtIndexSafe(int index, int repetitionCount) {
-    }
-
-    @Override
-    public BaseDataValueVector getDataVector() {
-      return null;  //To change body of implemented methods use File | Settings | File Templates.
+    public int add(int index) {
+      final int prevEnd = offsets.getAccessor().get(index+1);
+      offsets.getMutator().setSafe(index + 1, prevEnd + 1);
+      return prevEnd;
     }
   }
 
@@ -573,7 +582,7 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe
   }
 
   @Override
-  public int load(int parentValueCount, int childValueCount, DrillBuf buf) {
+  public int load(int valueCount, int innerValueCount, DrillBuf buf) {
     throw new UnsupportedOperationException();
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
index 1564aea..2b1dff0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
@@ -32,24 +32,17 @@ import org.apache.drill.exec.expr.holders.NullableUInt4Holder;
 import org.apache.drill.exec.expr.holders.NullableVar16CharHolder;
 import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
 import org.apache.drill.exec.expr.holders.RepeatedFloat4Holder;
-import org.apache.drill.exec.expr.holders.RepeatedMapHolder;
 import org.apache.drill.exec.expr.holders.RepeatedVarBinaryHolder;
 import org.apache.drill.exec.expr.holders.UInt4Holder;
 import org.apache.drill.exec.expr.holders.VarCharHolder;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.vector.BitVector;
-import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.NullableFloat4Vector;
 import org.apache.drill.exec.vector.NullableUInt4Vector;
 import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.NullableVector;
-import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
-import org.apache.drill.exec.vector.RepeatedVariableWidthVector;
 import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VariableWidthVector;
-import org.apache.drill.exec.vector.VariableWidthVector.VariableWidthAccessor;
 import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.exec.vector.complex.RepeatedListVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;


Mime
View raw message