Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 27C0D18222 for ; Mon, 11 May 2015 08:18:02 +0000 (UTC) Received: (qmail 20990 invoked by uid 500); 11 May 2015 08:18:01 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 20959 invoked by uid 500); 11 May 2015 08:18:01 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 20950 invoked by uid 99); 11 May 2015 08:18:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 May 2015 08:18:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A5532E01E2; Mon, 11 May 2015 08:18:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hg@apache.org To: commits@drill.apache.org Date: Mon, 11 May 2015 08:18:02 -0000 Message-Id: In-Reply-To: <16ee2f66a78749ec95595609238eeb78@git.apache.org> References: <16ee2f66a78749ec95595609238eeb78@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] drill git commit: DRILL-2150: Create an abstraction for repeated value vectors. DRILL-2150: Create an abstraction for repeated value vectors. Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4689468e Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4689468e Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4689468e Branch: refs/heads/master Commit: 4689468ef11a70c782f64af451807e1e10cdce65 Parents: a3ec52a Author: Hanifi Gunes Authored: Wed Apr 29 14:54:24 2015 -0700 Committer: Hanifi Gunes Committed: Mon May 11 01:20:26 2015 -0700 ---------------------------------------------------------------------- .../main/codegen/templates/ComplexReaders.java | 2 +- .../main/codegen/templates/ComplexWriters.java | 16 +- .../codegen/templates/FixedValueVectors.java | 1 + .../src/main/codegen/templates/ListWriters.java | 6 +- .../src/main/codegen/templates/MapWriters.java | 2 +- .../codegen/templates/RepeatedValueVectors.java | 240 ++------- .../exception/SchemaChangeRuntimeException.java | 42 ++ .../impl/flatten/FlattenRecordBatch.java | 25 +- .../physical/impl/flatten/FlattenTemplate.java | 25 +- .../exec/physical/impl/flatten/Flattener.java | 6 +- .../impl/project/ProjectRecordBatch.java | 12 +- .../apache/drill/exec/store/VectorHolder.java | 10 +- .../text/compliant/RepeatedVarCharOutput.java | 8 +- .../columnreaders/FixedWidthRepeatedReader.java | 22 +- .../columnreaders/ParquetRecordReader.java | 15 +- .../exec/store/text/DrillTextRecordReader.java | 2 +- .../drill/exec/vector/AddOrGetResult.java | 38 ++ .../drill/exec/vector/AllocationHelper.java | 8 +- .../drill/exec/vector/BaseDataValueVector.java | 5 +- .../exec/vector/BaseRepeatedValueVector.java | 206 ++++++++ .../drill/exec/vector/BaseValueVector.java | 21 +- .../drill/exec/vector/ContainerVectorLike.java | 39 ++ .../exec/vector/RepeatedFixedWidthVector.java | 53 -- .../vector/RepeatedFixedWidthVectorLike.java | 56 ++ .../drill/exec/vector/RepeatedValueVector.java | 85 +++ .../vector/RepeatedVariableWidthVector.java | 47 -- .../vector/RepeatedVariableWidthVectorLike.java | 47 ++ .../drill/exec/vector/RepeatedVector.java | 25 - .../drill/exec/vector/SchemaChangeCallBack.java | 6 +- .../apache/drill/exec/vector/ValueVector.java | 7 +- .../drill/exec/vector/VectorDescriptor.java | 57 ++ .../apache/drill/exec/vector/ZeroVector.java | 170 ++++++ .../drill/exec/vector/complex/MapVector.java | 3 +- .../exec/vector/complex/RepeatedListVector.java | 525 ++++++++++--------- .../exec/vector/complex/RepeatedMapVector.java | 111 ++-- .../exec/record/vector/TestValueVector.java | 7 - 36 files changed, 1246 insertions(+), 704 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/codegen/templates/ComplexReaders.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/ComplexReaders.java b/exec/java-exec/src/main/codegen/templates/ComplexReaders.java index fa1dac4..068efb4 100644 --- a/exec/java-exec/src/main/codegen/templates/ComplexReaders.java +++ b/exec/java-exec/src/main/codegen/templates/ComplexReaders.java @@ -89,7 +89,7 @@ public class ${nullMode}${name}ReaderImpl extends AbstractFieldReader { } public int size(){ - return vector.getAccessor().getCount(idx()); + return vector.getAccessor().getInnerValueCountAt(idx()); } public void read(int arrayIndex, ${minor.class?cap_first}Holder h){ http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/codegen/templates/ComplexWriters.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java index 49c75d1..980f9ac 100644 --- a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java +++ b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java @@ -77,24 +77,24 @@ public class ${eName}WriterImpl extends AbstractFieldWriter { public void write(${minor.class?cap_first}Holder h){ mutator.addSafe(idx(), h); - vector.getMutator().setValueCount(idx()); + vector.getMutator().setValueCount(idx()+1); } public void write(Nullable${minor.class?cap_first}Holder h){ mutator.addSafe(idx(), h); - vector.getMutator().setValueCount(idx()); + vector.getMutator().setValueCount(idx()+1); } <#if !(minor.class == "Decimal9" || minor.class == "Decimal18" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense")> public void write${minor.class}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, ){ mutator.addSafe(idx(), <#list fields as field>${field.name}<#if field_has_next>, ); - vector.getMutator().setValueCount(idx()); + vector.getMutator().setValueCount(idx()+1); } public void setPosition(int idx){ super.setPosition(idx); - mutator.startNewGroup(idx); + mutator.startNewValue(idx); } @@ -102,24 +102,24 @@ public class ${eName}WriterImpl extends AbstractFieldWriter { public void write(${minor.class}Holder h){ mutator.setSafe(idx(), h); - vector.getMutator().setValueCount(idx()); + vector.getMutator().setValueCount(idx()+1); } public void write(Nullable${minor.class}Holder h){ mutator.setSafe(idx(), h); - vector.getMutator().setValueCount(idx()); + vector.getMutator().setValueCount(idx()+1); } <#if !(minor.class == "Decimal9" || minor.class == "Decimal18" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense")> public void write${minor.class}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, ){ mutator.setSafe(idx(), <#if mode == "Nullable">1, <#list fields as field>${field.name}<#if field_has_next>, ); - vector.getMutator().setValueCount(idx()); + vector.getMutator().setValueCount(idx()+1); } <#if mode == "Nullable"> public void writeNull(){ mutator.setNull(idx()); - vector.getMutator().setValueCount(idx()); + vector.getMutator().setValueCount(idx()+1); } http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java index a805b8e..7d85810 100644 --- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java @@ -764,6 +764,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F allocationMonitor = 0; } VectorTrimmer.trim(data, idx); + data.writerIndex(valueCount * ${type.width}); } http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/codegen/templates/ListWriters.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/ListWriters.java b/exec/java-exec/src/main/codegen/templates/ListWriters.java index 6df4248..ab78603 100644 --- a/exec/java-exec/src/main/codegen/templates/ListWriters.java +++ b/exec/java-exec/src/main/codegen/templates/ListWriters.java @@ -46,7 +46,7 @@ public class ${mode}ListWriter extends AbstractFieldWriter{ protected final ${containerClass} container; private Mode mode = Mode.INIT; private FieldWriter writer; - protected RepeatedVector innerVector; + protected RepeatedValueVector innerVector; <#if mode == "Repeated">private int currentChildIndex = 0; public ${mode}ListWriter(String name, ${containerClass} container, FieldWriter parent){ @@ -158,7 +158,7 @@ public class ${mode}ListWriter extends AbstractFieldWriter{ public void start(){ final RepeatedListVector list = (RepeatedListVector) container; - final RepeatedListVector.Mutator mutator = list.getMutator(); + final RepeatedListVector.RepeatedMutator mutator = list.getMutator(); // make sure that the current vector can support the end position of this list. if(container.getValueCapacity() <= idx()){ @@ -169,7 +169,7 @@ public class ${mode}ListWriter extends AbstractFieldWriter{ RepeatedListHolder h = new RepeatedListHolder(); list.getAccessor().get(idx(), h); if(h.start >= h.end){ - mutator.startNewGroup(idx()); + mutator.startNewValue(idx()); } currentChildIndex = container.getMutator().add(idx()); if(writer != null){ http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/codegen/templates/MapWriters.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/MapWriters.java b/exec/java-exec/src/main/codegen/templates/MapWriters.java index 38df84b..06a6813 100644 --- a/exec/java-exec/src/main/codegen/templates/MapWriters.java +++ b/exec/java-exec/src/main/codegen/templates/MapWriters.java @@ -116,7 +116,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter{ map.getAccessor().get(idx(), h); if(h.start >= h.end){ - container.getMutator().startNewGroup(idx()); + container.getMutator().startNewValue(idx()); } currentChildIndex = container.getMutator().add(idx()); for(FieldWriter w: fields.values()){ http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java index c0fba66..37b8fac 100644 --- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java @@ -19,9 +19,9 @@ import java.lang.Override; import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.vector.BaseRepeatedValueVector; import org.apache.drill.exec.vector.BaseValueVector; -import org.apache.drill.exec.vector.RepeatedFixedWidthVector; -import org.apache.drill.exec.vector.UInt4Vector; +import org.apache.drill.exec.vector.RepeatedFixedWidthVectorLike; import org.mortbay.jetty.servlet.Holder; <@pp.dropOutputFile /> @@ -48,14 +48,11 @@ package org.apache.drill.exec.vector; * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker. */ -public final class Repeated${minor.class}Vector extends BaseValueVector implements Repeated<#if type.major == "VarLen">VariableWidth<#else>FixedWidthVector { +public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector implements Repeated<#if type.major == "VarLen">VariableWidth<#else>FixedWidthVectorLike { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Repeated${minor.class}Vector.class); - private int parentValueCount; - private int childValueCount; - - private final UInt4Vector offsets; // offsets to start of each record - private final ${minor.class}Vector values; + // we maintain local reference to concrete vector type for performance reasons. + private ${minor.class}Vector values; private final FieldReader reader = new Repeated${minor.class}ReaderImpl(Repeated${minor.class}Vector.this); private final Mutator mutator = new Mutator(); private final Accessor accessor = new Accessor(); @@ -63,56 +60,57 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen public Repeated${minor.class}Vector(MaterializedField field, BufferAllocator allocator) { super(field, allocator); - this.offsets = new UInt4Vector(null, allocator); - MaterializedField mf = MaterializedField.create(field.getPath(), Types.required(field.getType().getMinorType())); - this.values = new ${minor.class}Vector(mf, allocator); + addOrGetVector(VectorDescriptor.create(Types.required(field.getType().getMinorType()))); } @Override - public FieldReader getReader(){ - return reader; + public Mutator getMutator() { + return mutator; } - public int getValueCapacity(){ - return Math.min(values.getValueCapacity(), offsets.getValueCapacity() - 1); + @Override + public Accessor getAccessor() { + return accessor; } - public int getBufferSize(){ - if(accessor.getGroupCount() == 0){ - return 0; - } - return offsets.getBufferSize() + values.getBufferSize(); + @Override + public FieldReader getReader(){ + return reader; } - public UInt4Vector getOffsetVector(){ - return offsets; - } - - public ${minor.class}Vector getValuesVector(){ + @Override + public ${minor.class}Vector getDataVector(){ return values; } - - public DrillBuf getBuffer(){ - return values.getBuffer(); - } - + + @Override public TransferPair getTransferPair(){ return new TransferImpl(getField()); } + + @Override public TransferPair getTransferPair(FieldReference ref){ return new TransferImpl(getField().clone(ref)); } + @Override public TransferPair makeTransferPair(ValueVector to) { return new TransferImpl((Repeated${minor.class}Vector) to); } - + + @Override + public AddOrGetResult<${minor.class}Vector> addOrGetVector(VectorDescriptor descriptor) { + final AddOrGetResult<${minor.class}Vector> result = super.addOrGetVector(descriptor); + if (result.isCreated()) { + values = result.getVector(); + } + return result; + } + public void transferTo(Repeated${minor.class}Vector target){ target.clear(); offsets.transferTo(target.offsets); values.transferTo(target.values); - target.parentValueCount = parentValueCount; - target.childValueCount = childValueCount; clear(); } @@ -132,8 +130,6 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen normalizedPos = a.get(startIndex+i) - startPos; m.set(i, normalizedPos); } - to.parentValueCount = groups; - to.childValueCount = valuesToCopy; m.setValueCount(groups == 0 ? 0 : groups + 1); } @@ -167,33 +163,27 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen } public void copyFrom(int inIndex, int outIndex, Repeated${minor.class}Vector v){ - int count = v.getAccessor().getCount(inIndex); - getMutator().startNewGroup(outIndex); + final int count = v.getAccessor().getInnerValueCountAt(inIndex); + getMutator().startNewValue(outIndex); for (int i = 0; i < count; i++) { getMutator().add(outIndex, v.getAccessor().get(inIndex, i)); } } public void copyFromSafe(int inIndex, int outIndex, Repeated${minor.class}Vector v){ - int count = v.getAccessor().getCount(inIndex); - getMutator().startNewGroup(outIndex); + final int count = v.getAccessor().getInnerValueCountAt(inIndex); + getMutator().startNewValue(outIndex); for (int i = 0; i < count; i++) { getMutator().addSafe(outIndex, v.getAccessor().get(inIndex, i)); } } - @Override - public void setInitialCapacity(int numRecords) { - offsets.setInitialCapacity(numRecords + 1); - values.setInitialCapacity(numRecords * DEFAULT_REPEAT_PER_RECORD); - } public boolean allocateNewSafe(){ if(!offsets.allocateNewSafe()) return false; offsets.zeroVector(); if(!values.allocateNewSafe()) return false; mutator.reset(); - accessor.reset(); return true; } @@ -202,36 +192,28 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen offsets.zeroVector(); values.allocateNew(); mutator.reset(); - accessor.reset(); } <#if type.major == "VarLen"> @Override - public SerializedField getMetadata() { - return getMetadataBuilder() // - .setGroupCount(this.parentValueCount) // - .setValueCount(this.childValueCount) // - .setVarByteLength(values.getVarByteLength()) // - .setBufferLength(getBufferSize()) // - .build(); + protected SerializedField.Builder getMetadataBuilder() { + return super.getMetadataBuilder() + .setVarByteLength(values.getVarByteLength()); } - public void allocateNew(int totalBytes, int parentValueCount, int childValueCount) { - offsets.allocateNew(parentValueCount+1); + public void allocateNew(int totalBytes, int valueCount, int innerValueCount) { + offsets.allocateNew(valueCount+1); offsets.zeroVector(); - values.allocateNew(totalBytes, childValueCount); + values.allocateNew(totalBytes, innerValueCount); mutator.reset(); - accessor.reset(); } @Override - public int load(int dataBytes, int parentValueCount, int childValueCount, DrillBuf buf){ + public int load(int dataBytes, int valueCount, int innerValueCount, DrillBuf buf){ clear(); - this.parentValueCount = parentValueCount; - this.childValueCount = childValueCount; int loaded = 0; - loaded += offsets.load(parentValueCount+1, buf.slice(loaded, buf.capacity() - loaded)); - loaded += values.load(dataBytes + 4*(childValueCount + 1), childValueCount, buf.slice(loaded, buf.capacity() - loaded)); + loaded += offsets.load(valueCount+1, buf.slice(loaded, buf.capacity() - loaded)); + loaded += values.load(dataBytes + 4*(innerValueCount + 1), innerValueCount, buf.slice(loaded, buf.capacity() - loaded)); return loaded; } @@ -247,32 +229,20 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen } <#else> - - @Override - public SerializedField getMetadata() { - return getMetadataBuilder() - .setGroupCount(this.parentValueCount) - .setValueCount(this.childValueCount) - .setBufferLength(getBufferSize()) - .build(); - } - - public void allocateNew(int parentValueCount, int childValueCount) { + + public void allocateNew(int valueCount, int innerValueCount) { clear(); - offsets.allocateNew(parentValueCount+1); + offsets.allocateNew(valueCount+1); offsets.zeroVector(); - values.allocateNew(childValueCount); + values.allocateNew(innerValueCount); mutator.reset(); - accessor.reset(); } - public int load(int parentValueCount, int childValueCount, DrillBuf buf){ + public int load(int valueCount, int innerValueCount, DrillBuf buf){ clear(); - this.parentValueCount = parentValueCount; - this.childValueCount = childValueCount; int loaded = 0; - loaded += offsets.load(parentValueCount+1, buf.slice(loaded, buf.capacity() - loaded)); - loaded += values.load(childValueCount, buf.slice(loaded, buf.capacity() - loaded)); + loaded += offsets.load(valueCount+1, buf.slice(loaded, buf.capacity() - loaded)); + loaded += values.load(innerValueCount, buf.slice(loaded, buf.capacity() - loaded)); return loaded; } @@ -284,49 +254,12 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen } - @Override - public DrillBuf[] getBuffers(boolean clear) { - DrillBuf[] buffers = ObjectArrays.concat(offsets.getBuffers(false), values.getBuffers(false), DrillBuf.class); - if (clear) { - for (DrillBuf buffer:buffers) { - buffer.retain(); - } - clear(); - } - return buffers; - } - - public void clear(){ - offsets.clear(); - values.clear(); - parentValueCount = 0; - childValueCount = 0; - } - - public Mutator getMutator(){ - return mutator; - } - - public Accessor getAccessor(){ - return accessor; - } // This is declared a subclass of the accessor declared inside of FixedWidthVector, this is also used for // variable length vectors, as they should ahve consistent interface as much as possible, if they need to diverge // in the future, the interface shold be declared in the respective value vector superclasses for fixed and variable // and we should refer to each in the generation template - public final class Accessor extends BaseValueVector.BaseAccessor implements RepeatedFixedWidthVector.RepeatedAccessor{ - - /** - * Get the elements at the given index. - */ - public int getCount(int index) { - return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index); - } - - public ValueVector getAllChildValues() { - return values; - } + public final class Accessor extends BaseRepeatedValueVector.BaseRepeatedAccessor { public List<${friendlyType}> getObject(int index) { List<${friendlyType}> vals = new JsonStringArrayList(); @@ -337,10 +270,6 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen } return vals; } - - public int getGroupSizeAtIndex(int index){ - return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index); - } public ${friendlyType} getSingleObject(int index, int arrayIndex){ int start = offsets.getAccessor().get(index); @@ -360,12 +289,7 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen get(int index, int positionIndex) { return values.getAccessor().get(offsets.getAccessor().get(index) + positionIndex); } - - - public boolean isNull(int index){ - return false; - } - + public void get(int index, Repeated${minor.class}Holder holder){ holder.start = offsets.getAccessor().get(index); holder.end = offsets.getAccessor().get(index+1); @@ -375,61 +299,24 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen public void get(int index, int positionIndex, ${minor.class}Holder holder) { int offset = offsets.getAccessor().get(index); assert offset >= 0; - assert positionIndex < getCount(index); + assert positionIndex < getInnerValueCountAt(index); values.getAccessor().get(offset + positionIndex, holder); } public void get(int index, int positionIndex, Nullable${minor.class}Holder holder) { int offset = offsets.getAccessor().get(index); assert offset >= 0; - if (positionIndex >= getCount(index)) { + if (positionIndex >= getInnerValueCountAt(index)) { holder.isSet = 0; return; } values.getAccessor().get(offset + positionIndex, holder); } - - public MaterializedField getField() { - return field; - } - - public int getGroupCount(){ - return parentValueCount; - } - - public int getValueCount(){ - return childValueCount; - } - - public void reset(){ - - } } - public final class Mutator extends BaseValueVector.BaseMutator implements RepeatedMutator { - - - private Mutator(){ - } - - public void setRepetitionAtIndexSafe(int index, int repetitionCount) { - offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index) + repetitionCount); - } - - public BaseDataValueVector getDataVector() { - return values; - } - - public void setValueCounts(int parentValueCount, int childValueCount){ - Repeated${minor.class}Vector.this.parentValueCount = parentValueCount; - Repeated${minor.class}Vector.this.childValueCount = childValueCount; - values.getMutator().setValueCount(childValueCount); - offsets.getMutator().setValueCount(parentValueCount == 0 ? 0 : parentValueCount + 1); - } + public final class Mutator extends BaseRepeatedValueVector.BaseRepeatedMutator implements RepeatedMutator { - public void startNewGroup(int index) { - offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index)); - } + private Mutator() { } /** * Add an element to the given record index. This is similar to the set() method in other @@ -468,7 +355,7 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen public void setSafe(int index, Repeated${minor.class}Holder h){ ${minor.class}Holder ih = new ${minor.class}Holder(); - getMutator().startNewGroup(index); + getMutator().startNewValue(index); for(int i = h.start; i < h.end; i++){ h.vector.getAccessor().get(i, ih); getMutator().addSafe(index, ih); @@ -511,17 +398,6 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen add(index, innerHolder); } } - - /** - * Set the number of value groups in this repeated field. - * @param groupCount Count of Value Groups. - */ - public void setValueCount(int groupCount) { - parentValueCount = groupCount; - childValueCount = offsets.getAccessor().get(groupCount); - offsets.getMutator().setValueCount(groupCount == 0 ? 0 : groupCount+1); - values.getMutator().setValueCount(childValueCount); - } public void generateTestData(final int valCount){ int[] sizes = {1,2,0,6}; http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SchemaChangeRuntimeException.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SchemaChangeRuntimeException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SchemaChangeRuntimeException.java new file mode 100644 index 0000000..f2a7e63 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SchemaChangeRuntimeException.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.exception; + +import org.apache.drill.common.exceptions.DrillRuntimeException; + +public class SchemaChangeRuntimeException extends DrillRuntimeException { + public SchemaChangeRuntimeException() { + super(); + } + + public SchemaChangeRuntimeException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + + public SchemaChangeRuntimeException(String message, Throwable cause) { + super(message, cause); + } + + public SchemaChangeRuntimeException(String message) { + super(message); + } + + public SchemaChangeRuntimeException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index 7a5b352..00a78fd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -21,15 +21,13 @@ import java.io.IOException; import java.util.List; import com.carrotsearch.hppc.IntOpenHashSet; -import com.google.common.base.Preconditions; -import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.logical.data.NamedExpression; -import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; @@ -52,9 +50,8 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.vector.RepeatedVector; +import org.apache.drill.exec.vector.RepeatedValueVector; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.complex.MapVector; import org.apache.drill.exec.vector.complex.RepeatedMapVector; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; @@ -129,13 +126,13 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch { private void setFlattenVector() { try { - flattener.setFlattenField((RepeatedVector) incoming.getValueAccessorById( - incoming.getSchema().getColumn( - incoming.getValueVectorId( - popConfig.getColumn()).getFieldIds()[0]).getValueClass(), - incoming.getValueVectorId(popConfig.getColumn()).getFieldIds()).getValueVector()); + final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn()); + final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]); + final RepeatedValueVector vector = RepeatedValueVector.class.cast(incoming.getValueAccessorById( + field.getValueClass(), typedFieldId.getFieldIds()).getValueVector()); + flattener.setFlattenField(vector); } catch (Exception ex) { - throw new DrillRuntimeException("Trying to flatten a non-repeated filed."); + throw UserException.unsupportedError(ex).message("Trying to flatten a non-repeated field.").build(); } } @@ -152,7 +149,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch { // inside of the the flattener for the current batch setFlattenVector(); - int childCount = incomingRecordCount == 0 ? 0 : flattener.getFlattenField().getAccessor().getValueCount(); + int childCount = incomingRecordCount == 0 ? 0 : flattener.getFlattenField().getAccessor().getInnerValueCount(); int outputRecords = flattener.flattenRecords(0, incomingRecordCount, 0); // TODO - change this to be based on the repeated vector length if (outputRecords < childCount) { @@ -178,7 +175,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch { } private void handleRemainder() { - int remainingRecordCount = flattener.getFlattenField().getAccessor().getValueCount() - remainderIndex; + int remainingRecordCount = flattener.getFlattenField().getAccessor().getInnerValueCount() - remainderIndex; if (!doAlloc()) { outOfMemory = true; return; @@ -271,7 +268,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch { if (flattenField instanceof RepeatedMapVector) { tp = ((RepeatedMapVector)flattenField).getTransferPairToSingleMap(reference); } else { - ValueVector vvIn = ((RepeatedVector)flattenField).getAccessor().getAllChildValues(); + final ValueVector vvIn = RepeatedValueVector.class.cast(flattenField).getDataVector(); // vvIn may be null because of fast schema return for repeated list vectors if (vvIn != null) { tp = vvIn.getTransferPair(reference); http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java index 96209a2..b8d040c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java @@ -31,8 +31,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4; import com.google.common.collect.ImmutableList; -import org.apache.drill.exec.vector.RepeatedFixedWidthVector.RepeatedAccessor; -import org.apache.drill.exec.vector.RepeatedVector; +import org.apache.drill.exec.vector.RepeatedValueVector; public abstract class FlattenTemplate implements Flattener { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenTemplate.class); @@ -43,9 +42,9 @@ public abstract class FlattenTemplate implements Flattener { private SelectionVector2 vector2; private SelectionVector4 vector4; private SelectionVectorMode svMode; - RepeatedVector fieldToFlatten; - RepeatedAccessor accessor; - private int groupIndex; + private RepeatedValueVector fieldToFlatten; + private RepeatedValueVector.RepeatedAccessor accessor; + private int valueIndex; // this allows for groups to be written between batches if we run out of space, for cases where we have finished // a batch on the boundary it will be set to 0 @@ -60,12 +59,12 @@ public abstract class FlattenTemplate implements Flattener { } @Override - public void setFlattenField(RepeatedVector flattenField) { + public void setFlattenField(RepeatedValueVector flattenField) { this.fieldToFlatten = flattenField; - this.accessor = flattenField.getAccessor(); + this.accessor = RepeatedValueVector.RepeatedAccessor.class.cast(flattenField.getAccessor()); } - public RepeatedVector getFlattenField() { + public RepeatedValueVector getFlattenField() { return fieldToFlatten; } @@ -84,14 +83,14 @@ public abstract class FlattenTemplate implements Flattener { childIndexWithinCurrGroup = 0; } outer: { - final int groupCount = accessor.getGroupCount(); - for ( ; groupIndex < groupCount; groupIndex++) { - currGroupSize = accessor.getGroupSizeAtIndex(groupIndex); + final int valueCount = accessor.getValueCount(); + for ( ; valueIndex < valueCount; valueIndex++) { + currGroupSize = accessor.getInnerValueCountAt(valueIndex); for ( ; childIndexWithinCurrGroup < currGroupSize; childIndexWithinCurrGroup++) { if (firstOutputIndex == OUTPUT_BATCH_SIZE) { break outer; } - doEval(groupIndex, firstOutputIndex); + doEval(valueIndex, firstOutputIndex); firstOutputIndex++; childIndex++; } @@ -133,7 +132,7 @@ public abstract class FlattenTemplate implements Flattener { @Override public void resetGroupIndex() { - this.groupIndex = 0; + this.valueIndex = 0; this.currGroupSize = 0; this.childIndex = 0; } http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java index 2141ca2..323bf43 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java @@ -24,14 +24,14 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; -import org.apache.drill.exec.vector.RepeatedVector; +import org.apache.drill.exec.vector.RepeatedValueVector; public interface Flattener { public abstract void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List transfers) throws SchemaChangeException; public abstract int flattenRecords(int startIndex, int recordCount, int firstOutputIndex); - public void setFlattenField(RepeatedVector repeatedColumn); - public RepeatedVector getFlattenField(); + public void setFlattenField(RepeatedValueVector repeatedColumn); + public RepeatedValueVector getFlattenField(); public void resetGroupIndex(); public static TemplateClassDefinition TEMPLATE_DEFINITION = new TemplateClassDefinition(Flattener.class, FlattenTemplate.class); http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 32ffb6f..946d117 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -63,6 +63,7 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.AbstractContainerVector; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import com.carrotsearch.hppc.IntOpenHashSet; @@ -415,7 +416,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch { // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer. ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef()); cg.addExpr(expr); - } else{ + } else { // need to do evaluation. final ValueVector vector = container.addOrGet(outputField, callBack); allocationVectors.add(vector); @@ -424,6 +425,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch { final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe); final HoldingContainer hc = cg.addExpr(write); + // We cannot do multiple transfers from the same vector. However we still need to instantiate the output vector. + if (expr instanceof ValueVectorReadExpression) { + final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr; + if (!vectorRead.hasReadPath()) { + final TypedFieldId id = vectorRead.getFieldId(); + final ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector(); + vvIn.makeTransferPair(vector); + } + } logger.debug("Added eval for project expression."); } } http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java index 8387d49..e602fd7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java @@ -18,9 +18,9 @@ package org.apache.drill.exec.store; import org.apache.drill.exec.vector.AllocationHelper; -import org.apache.drill.exec.vector.RepeatedFixedWidthVector; +import org.apache.drill.exec.vector.RepeatedFixedWidthVectorLike; import org.apache.drill.exec.vector.RepeatedMutator; -import org.apache.drill.exec.vector.RepeatedVariableWidthVector; +import org.apache.drill.exec.vector.RepeatedVariableWidthVectorLike; import org.apache.drill.exec.vector.ValueVector; public class VectorHolder { @@ -35,7 +35,7 @@ public class VectorHolder { public VectorHolder(int length, ValueVector vector) { this.length = length; this.vector = vector; - if (vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) { + if (vector instanceof RepeatedFixedWidthVectorLike || vector instanceof RepeatedVariableWidthVectorLike) { repeated = true; } } @@ -43,7 +43,7 @@ public class VectorHolder { public VectorHolder(ValueVector vector) { this.length = vector.getValueCapacity(); this.vector = vector; - if (vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) { + if (vector instanceof RepeatedFixedWidthVectorLike || vector instanceof RepeatedVariableWidthVectorLike) { repeated = true; } } @@ -90,7 +90,7 @@ public class VectorHolder { public void populateVectorLength() { ValueVector.Mutator mutator = vector.getMutator(); - if (vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) { + if (vector instanceof RepeatedFixedWidthVectorLike || vector instanceof RepeatedVariableWidthVectorLike) { mutator.setValueCount(groupCount); } else { mutator.setValueCount(count); http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java index 3ad5c2a..40276f4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java @@ -192,7 +192,7 @@ class RepeatedVarCharOutput extends TextOutput { } private void loadVarCharDataAddress(){ - DrillBuf buf = vector.getValuesVector().getBuffer(); + DrillBuf buf = vector.getDataVector().getBuffer(); checkBuf(buf); this.characterData = buf.memoryAddress(); this.characterDataOriginal = buf.memoryAddress(); @@ -200,7 +200,7 @@ class RepeatedVarCharOutput extends TextOutput { } private void loadVarCharOffsetAddress(){ - DrillBuf buf = vector.getValuesVector().getOffsetVector().getBuffer(); + DrillBuf buf = vector.getDataVector().getOffsetVector().getBuffer(); checkBuf(buf); this.charLengthOffset = buf.memoryAddress() + 4; this.charLengthOffsetOriginal = buf.memoryAddress() + 4; // add four as offsets conceptually start at 1. (first item is 0..1) @@ -208,14 +208,14 @@ class RepeatedVarCharOutput extends TextOutput { } private void expandVarCharOffsets(){ - vector.getValuesVector().getOffsetVector().reAlloc(); + vector.getDataVector().getOffsetVector().reAlloc(); long diff = charLengthOffset - charLengthOffsetOriginal; loadVarCharOffsetAddress(); charLengthOffset += diff; } private void expandVarCharData(){ - vector.getValuesVector().reAlloc(); + vector.getDataVector().reAlloc(); long diff = characterData - characterDataOriginal; loadVarCharDataAddress(); characterData += diff; http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java index 7f8b611..2b929a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java @@ -20,7 +20,10 @@ package org.apache.drill.exec.store.parquet.columnreaders; import java.io.IOException; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.vector.RepeatedFixedWidthVector; +import org.apache.drill.exec.vector.BaseDataValueVector; +import org.apache.drill.exec.vector.RepeatedFixedWidthVectorLike; +import org.apache.drill.exec.vector.RepeatedValueVector; +import org.apache.drill.exec.vector.UInt4Vector; import org.apache.drill.exec.vector.ValueVector; import parquet.column.ColumnDescriptor; @@ -29,7 +32,7 @@ import parquet.hadoop.metadata.ColumnChunkMetaData; public class FixedWidthRepeatedReader extends VarLengthColumn { - RepeatedFixedWidthVector castedRepeatedVector; + RepeatedValueVector castedRepeatedVector; ColumnReader dataReader; int dataTypeLengthInBytes; // we can do a vector copy of the data once we figure out how much we need to copy @@ -47,9 +50,9 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { boolean notFishedReadingList; byte[] leftOverBytes; - FixedWidthRepeatedReader(ParquetRecordReader parentReader, ColumnReader dataReader, int dataTypeLengthInBytes, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector valueVector, SchemaElement schemaElement) throws ExecutionSetupException { + FixedWidthRepeatedReader(ParquetRecordReader parentReader, ColumnReader dataReader, int dataTypeLengthInBytes, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, RepeatedValueVector valueVector, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, valueVector, schemaElement); - castedRepeatedVector = (RepeatedFixedWidthVector) valueVector; + this.castedRepeatedVector = valueVector; this.dataTypeLengthInBytes = dataTypeLengthInBytes; this.dataReader = dataReader; this.dataReader.pageReader.clear(); @@ -65,7 +68,7 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { bytesReadInCurrentPass = 0; valuesReadInCurrentPass = 0; pageReader.valuesReadyToRead = 0; - dataReader.vectorData = castedRepeatedVector.getMutator().getDataVector().getBuffer(); + dataReader.vectorData = BaseDataValueVector.class.cast(castedRepeatedVector.getDataVector()).getBuffer(); dataReader.valuesReadInCurrentPass = 0; repeatedGroupsReadInCurrentPass = 0; } @@ -200,8 +203,8 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { currentValueListLength += numLeftoverVals; } // this should not fail - castedRepeatedVector.getMutator().setRepetitionAtIndexSafe(repeatedGroupsReadInCurrentPass, - currentValueListLength); + final UInt4Vector offsets = castedRepeatedVector.getOffsetVector(); + offsets.getMutator().setSafe(repeatedGroupsReadInCurrentPass + 1, offsets.getAccessor().get(repeatedGroupsReadInCurrentPass)); // This field is being referenced in the superclass determineSize method, so we need to set it here // again going to make this the length in BYTES to avoid repetitive multiplication/division dataTypeLengthInBits = repeatedValuesInCurrentList * dataTypeLengthInBytes; @@ -218,12 +221,13 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { dataReader.valuesReadInCurrentPass = 0; dataReader.readValues(valuesToRead); valuesReadInCurrentPass += valuesToRead; - castedRepeatedVector.getMutator().setValueCounts(repeatedGroupsReadInCurrentPass, valuesReadInCurrentPass); + castedRepeatedVector.getMutator().setValueCount(repeatedGroupsReadInCurrentPass); + castedRepeatedVector.getDataVector().getMutator().setValueCount(valuesReadInCurrentPass); } @Override public int capacity() { - return castedRepeatedVector.getMutator().getDataVector().getBuffer().capacity(); + return BaseDataValueVector.class.cast(castedRepeatedVector.getDataVector()).getBuffer().capacity(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index 2f07fb3..0cbd480 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -41,7 +41,7 @@ import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.parquet.DirectCodecFactory; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.NullableIntVector; -import org.apache.drill.exec.vector.RepeatedFixedWidthVector; +import org.apache.drill.exec.vector.RepeatedValueVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -274,7 +274,7 @@ public class ParquetRecordReader extends AbstractRecordReader { } try { - ValueVector v; + ValueVector vector; SchemaElement schemaElement; ArrayList varLengthColumns = new ArrayList<>(); // initialize all of the column read status objects @@ -292,23 +292,24 @@ public class ParquetRecordReader extends AbstractRecordReader { } fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY; - v = output.addField(field, (Class) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); + vector = output.addField(field, (Class) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) { if (column.getMaxRepetitionLevel() > 0) { + final RepeatedValueVector repeatedVector = RepeatedValueVector.class.cast(vector); ColumnReader dataReader = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, - ((RepeatedFixedWidthVector) v).getMutator().getDataVector(), schemaElement); + repeatedVector.getDataVector(), schemaElement); varLengthColumns.add(new FixedWidthRepeatedReader(this, dataReader, - getTypeLengthInBits(column.getType()), -1, column, columnChunkMetaData, false, v, schemaElement)); + getTypeLengthInBits(column.getType()), -1, column, columnChunkMetaData, false, repeatedVector, schemaElement)); } else { columnStatuses.add(ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, - column, columnChunkMetaData, recordsPerBatch, v, + column, columnChunkMetaData, recordsPerBatch, vector, schemaElement)); } } else { // create a reader and add it to the appropriate list - varLengthColumns.add(ColumnReaderFactory.getReader(this, -1, column, columnChunkMetaData, false, v, schemaElement)); + varLengthColumns.add(ColumnReaderFactory.getReader(this, -1, column, columnChunkMetaData, false, vector, schemaElement)); } } varLengthReader = new VarLenBinaryReader(this, varLengthColumns); http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java index e25bd74..c59ade9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java @@ -162,7 +162,7 @@ public class DrillTextRecordReader extends AbstractRecordReader { // index of the scanned field int p = 0; int i = 0; - vector.getMutator().startNewGroup(recordCount); + vector.getMutator().startNewValue(recordCount); // Process each field in this line while (end < value.getLength() - 1) { if(numCols > 0 && p >= numCols) { http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AddOrGetResult.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AddOrGetResult.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AddOrGetResult.java new file mode 100644 index 0000000..7d1f08d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AddOrGetResult.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector; + +import com.google.common.base.Preconditions; + +public class AddOrGetResult { + private final V vector; + private final boolean created; + + public AddOrGetResult(V vector, boolean created) { + this.vector = Preconditions.checkNotNull(vector); + this.created = created; + } + + public V getVector() { + return vector; + } + + public boolean isCreated() { + return created; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java index bf465c7..eddefd0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java @@ -31,10 +31,10 @@ public class AllocationHelper { ((FixedWidthVector) v).allocateNew(valueCount); } else if (v instanceof VariableWidthVector) { ((VariableWidthVector) v).allocateNew(valueCount * bytesPerValue, valueCount); - }else if(v instanceof RepeatedFixedWidthVector){ - ((RepeatedFixedWidthVector) v).allocateNew(valueCount, childValCount); - }else if(v instanceof RepeatedVariableWidthVector){ - ((RepeatedVariableWidthVector) v).allocateNew(childValCount * bytesPerValue, valueCount, childValCount); + }else if(v instanceof RepeatedFixedWidthVectorLike){ + ((RepeatedFixedWidthVectorLike) v).allocateNew(valueCount, childValCount); + }else if(v instanceof RepeatedVariableWidthVectorLike){ + ((RepeatedVariableWidthVectorLike) v).allocateNew(childValCount * bytesPerValue, valueCount, childValCount); }else{ v.allocateNew(); } http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java index 0c6097c..6d356f2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java @@ -22,8 +22,8 @@ import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.record.MaterializedField; -public abstract class BaseDataValueVector, A extends BaseValueVector.BaseAccessor, - M extends BaseValueVector.BaseMutator> extends BaseValueVector { +public abstract class BaseDataValueVector + extends BaseValueVector { protected DrillBuf data; @@ -36,6 +36,7 @@ public abstract class BaseDataValueVector, A public void clear() { data.release(); data = allocator.getEmpty(); + super.clear(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseRepeatedValueVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseRepeatedValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseRepeatedValueVector.java new file mode 100644 index 0000000..bcf0793 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseRepeatedValueVector.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector; + +import java.util.Collections; +import java.util.Iterator; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ObjectArrays; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.exception.SchemaChangeRuntimeException; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.record.MaterializedField; + +public abstract class BaseRepeatedValueVector + extends BaseValueVector implements RepeatedValueVector { + + public final static ValueVector DEFAULT_DATA_VECTOR = ZeroVector.INSTANCE; + public final static String OFFSETS_VECTOR_NAME = "offsets"; + public final static String DATA_VECTOR_NAME = "data"; + + private final static MaterializedField offsetsField = + MaterializedField.create(OFFSETS_VECTOR_NAME, Types.required(TypeProtos.MinorType.UINT4)); + + protected final UInt4Vector offsets; + protected ValueVector vector; + + protected BaseRepeatedValueVector(MaterializedField field, BufferAllocator allocator) { + this(field, allocator, DEFAULT_DATA_VECTOR); + } + + protected BaseRepeatedValueVector(MaterializedField field, BufferAllocator allocator, ValueVector vector) { + super(field, allocator); + this.offsets = new UInt4Vector(offsetsField, allocator); + this.vector = Preconditions.checkNotNull(vector, "data vector cannot be null"); + } + + @Override + public boolean allocateNewSafe() { + if (!offsets.allocateNewSafe()) { + return false; + } + offsets.zeroVector(); + return vector.allocateNewSafe(); + } + + @Override + public UInt4Vector getOffsetVector() { + return offsets; + } + + @Override + public ValueVector getDataVector() { + return vector; + } + + @Override + public void setInitialCapacity(int numRecords) { + offsets.setInitialCapacity(numRecords + 1); + vector.setInitialCapacity(numRecords * RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD); + } + + @Override + public int getValueCapacity() { + final int offsetValueCapacity = offsets.getValueCapacity() - 1; + if (vector == DEFAULT_DATA_VECTOR) { + return offsetValueCapacity; + } + return Math.min(vector.getValueCapacity(), offsetValueCapacity); + } + + @Override + protected UserBitShared.SerializedField.Builder getMetadataBuilder() { + return super.getMetadataBuilder() + .setGroupCount(getAccessor().getValueCount()) + .setValueCount(getAccessor().getInnerValueCount()) + .addChild(vector.getMetadata()); + } + + @Override + public int getBufferSize() { + if (getAccessor().getValueCount() == 0) { + return 0; + } + return offsets.getBufferSize() + vector.getBufferSize(); + } + + @Override + public Iterator iterator() { + return Collections.singleton(getDataVector()).iterator(); + } + + @Override + public void clear() { + offsets.clear(); + vector.clear(); + super.clear(); + } + + @Override + public DrillBuf[] getBuffers(boolean clear) { + final DrillBuf[] buffers = ObjectArrays.concat(offsets.getBuffers(false), vector.getBuffers(false), DrillBuf.class); + if (clear) { + for (DrillBuf buffer:buffers) { + buffer.retain(); + } + clear(); + } + return buffers; + } + + /** + * Returns 1 if inner vector is explicitly set via #addOrGetVector else 0 + * + * @see {@link ContainerVectorLike#size} + */ + @Override + public int size() { + return vector == DEFAULT_DATA_VECTOR ? 0:1; + } + + @Override + public AddOrGetResult addOrGetVector(VectorDescriptor descriptor) { + boolean created = false; + if (vector == DEFAULT_DATA_VECTOR) { + vector = TypeHelper.getNewVector(MaterializedField.create(DATA_VECTOR_NAME, descriptor.getType()), allocator); + getField().addChild(vector.getField()); + created = true; + } + + final TypeProtos.MajorType actual = vector.getField().getType(); + if (!actual.equals(descriptor.getType())) { + final String msg = String.format("Inner vector type mismatch. Requested type: [%s], actual type: [%s]", + descriptor.getType(), actual); + throw new SchemaChangeRuntimeException(msg); + } + + return new AddOrGetResult<>((T)vector, created); + } + + public abstract class BaseRepeatedAccessor extends BaseValueVector.BaseAccessor implements RepeatedAccessor { + + @Override + public int getValueCount() { + return Math.max(offsets.getAccessor().getValueCount() - 1, 0); + } + + @Override + public int getInnerValueCount() { + return vector.getAccessor().getValueCount(); + } + + @Override + public int getInnerValueCountAt(int index) { + return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index); + } + + @Override + public boolean isNull(int index) { + return false; + } + + @Override + public boolean isEmpty(int index) { + return false; + } + } + + public abstract class BaseRepeatedMutator extends BaseValueVector.BaseMutator implements RepeatedMutator { + + @Override + public void startNewValue(int index) { + offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index)); + setValueCount(index+1); + } + + @Override + public void setValueCount(int valueCount) { + // TODO: populate offset end points + offsets.getMutator().setValueCount(valueCount == 0 ? 0 : valueCount+1); + final int childValueCount = valueCount == 0 ? 0 : offsets.getAccessor().get(valueCount); + vector.getMutator().setValueCount(childValueCount); + } + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java index 22f0fe7..67c489d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java @@ -25,9 +25,10 @@ import org.apache.drill.common.expression.FieldReference; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.TransferPair; -public abstract class BaseValueVector, A extends BaseValueVector.BaseAccessor, - M extends BaseValueVector.BaseMutator> implements ValueVector { +public abstract class BaseValueVector + implements ValueVector { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseValueVector.class); protected final BufferAllocator allocator; @@ -40,6 +41,11 @@ public abstract class BaseValueVector, A exte } @Override + public void clear() { + getMutator().reset(); + } + + @Override public void close() { clear(); } @@ -54,6 +60,11 @@ public abstract class BaseValueVector, A exte } @Override + public TransferPair getTransferPair() { + return getTransferPair(new FieldReference(getField().getPath())); + } + + @Override public SerializedField getMetadata() { return getMetadataBuilder().build(); } @@ -76,11 +87,15 @@ public abstract class BaseValueVector, A exte public abstract static class BaseMutator implements ValueVector.Mutator { protected BaseMutator() { } + @Override + public void generateTestData(int values) { } + + //TODO: consider making mutator stateless(if possible) on another issue. public void reset() { } } @Override - public Iterator> iterator() { + public Iterator iterator() { return Iterators.emptyIterator(); } http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ContainerVectorLike.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ContainerVectorLike.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ContainerVectorLike.java new file mode 100644 index 0000000..95e3365 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ContainerVectorLike.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector; + +/** + * A mix-in used for introducing container vector-like behaviour. + */ +public interface ContainerVectorLike { + + /** + * Creates and adds a child vector if none with the same name exists, else returns the vector instance. + * + * @param descriptor vector descriptor + * @return result of operation wrapping vector corresponding to the given descriptor and whether it's newly created + * @throws org.apache.drill.common.exceptions.DrillRuntimeException + * if schema change is not permissible between the given and existing data vector types. + */ + AddOrGetResult addOrGetVector(VectorDescriptor descriptor); + + /** + * Returns the number of child vectors in this container vector-like instance. + */ + int size(); +} http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java deleted file mode 100644 index eaae7ad..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.vector; - -import io.netty.buffer.DrillBuf; - -public interface RepeatedFixedWidthVector extends ValueVector, RepeatedVector { - /** - * Allocate a new memory space for this vector. Must be called prior to using the ValueVector. - * - * @param parentValueCount Number of separate repeating groupings. - * @param childValueCount Number of supported values in the vector. - */ - public void allocateNew(int parentValueCount, int childValueCount); - - /** - * Load the records in the provided buffer based on the given number of values. - * @param parentValueCount Number of separate repeating groupings. - * @param valueCount Number atomic values the buffer contains. - * @param buf Incoming buffer. - * @return The number of bytes of the buffer that were consumed. - */ - public int load(int parentValueCount, int childValueCount, DrillBuf buf); - - public abstract RepeatedMutator getMutator(); - - public interface RepeatedAccessor extends Accessor { - public int getGroupCount(); - public int getValueCount(); - public int getGroupSizeAtIndex(int index); - public ValueVector getAllChildValues(); - } - public interface RepeatedMutator extends Mutator { - public void setValueCounts(int parentValueCount, int childValueCount); - public void setRepetitionAtIndexSafe(int index, int repetitionCount); - public BaseDataValueVector getDataVector(); - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVectorLike.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVectorLike.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVectorLike.java new file mode 100644 index 0000000..450c673 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVectorLike.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector; + +import io.netty.buffer.DrillBuf; + +/** + * A {@link org.apache.drill.exec.vector.ValueVector} mix-in that can be used in conjunction with + * {@link org.apache.drill.exec.vector.RepeatedValueVector} subtypes. + */ +public interface RepeatedFixedWidthVectorLike { + /** + * Allocate a new memory space for this vector. Must be called prior to using the ValueVector. + * + * @param valueCount Number of separate repeating groupings. + * @param innerValueCount Number of supported values in the vector. + */ + public void allocateNew(int valueCount, int innerValueCount); + + /** + * Load the records in the provided buffer based on the given number of values. + * @param valueCount Number of separate repeating groupings. + * @param innerValueCount Number atomic values the buffer contains. + * @param buf Incoming buffer. + * @return The number of bytes of the buffer that were consumed. + */ + public int load(int valueCount, int innerValueCount, DrillBuf buf); + +// public interface RepeatedAccessor extends Accessor { +// public int getGroupCount(); +// public int getValueCount(); +// public int getGroupSizeAtIndex(int index); +// public ValueVector getAllChildValues(); +// } +// +// public interface RepeatedMutator extends Mutator { +// public void setValueCounts(int parentValueCount, int childValueCount); +// public void setRepetitionAtIndexSafe(int index, int repetitionCount); +// public BaseDataValueVector getDataVector(); +// } +} http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedValueVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedValueVector.java new file mode 100644 index 0000000..d5a8281 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedValueVector.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector; + +/** + * An abstraction representing repeated value vectors. + * + * A repeated vector contains values that may either be flat or nested. A value consists of zero or more cells(inner values). + * Current design maintains data and offsets vectors. Each cell is stored in the data vector. Repeated vector + * uses the offset vector to determine the sequence of cells pertaining to an individual value. + * + * @param repeated accessor type + * @param repeated mutator type + */ +public interface RepeatedValueVector + extends ValueVector, ContainerVectorLike { + + final static int DEFAULT_REPEAT_PER_RECORD = 5; + + /** + * Returns the underlying offset vector or null if none exists. + * + * TODO(DRILL-2995): eliminate exposing low-level interfaces. + */ + UInt4Vector getOffsetVector(); + + /** + * Returns the underlying data vector or null if none exists. + */ + ValueVector getDataVector(); + + @Override + A getAccessor(); + + @Override + M getMutator(); + + interface RepeatedAccessor extends ValueVector.Accessor { + /** + * Returns total number of cells that vector contains. + * + * The result includes empty, null valued cells. + */ + int getInnerValueCount(); + + + /** + * Returns number of cells that the value at the given index contains. + */ + int getInnerValueCountAt(int index); + + /** + * Returns true if the value at the given index is empty, false otherwise. + * + * @param index value index + */ + boolean isEmpty(int index); + } + + interface RepeatedMutator extends ValueVector.Mutator { + /** + * Starts a new value that is a container of cells. + * + * @param index index of new value to start + */ + void startNewValue(int index); + + + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java deleted file mode 100644 index a499341..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.vector; - -import io.netty.buffer.DrillBuf; - -public interface RepeatedVariableWidthVector extends ValueVector, RepeatedVector { - /** - * Allocate a new memory space for this vector. Must be called prior to using the ValueVector. - * - * @param totalBytes Desired size of the underlying data buffer. - * @param parentValueCount Number of separate repeating groupings. - * @param childValueCount Number of supported values in the vector. - */ - public void allocateNew(int totalBytes, int parentValueCount, int childValueCount); - - /** - * Provide the maximum amount of variable width bytes that can be stored int his vector. - * @return - */ - public int getByteCapacity(); - - /** - * Load the records in the provided buffer based on the given number of values. - * @param dataBytes The number of bytes associated with the data array. - * @param parentValueCount Number of separate repeating groupings. - * @param childValueCount Number of supported values in the vector. - * @param buf Incoming buffer. - * @return The number of bytes of the buffer that were consumed. - */ - public int load(int dataBytes, int parentValueCount, int childValueCount, DrillBuf buf); -} http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVectorLike.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVectorLike.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVectorLike.java new file mode 100644 index 0000000..ac8589e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVectorLike.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector; + +import io.netty.buffer.DrillBuf; + +public interface RepeatedVariableWidthVectorLike { + /** + * Allocate a new memory space for this vector. Must be called prior to using the ValueVector. + * + * @param totalBytes Desired size of the underlying data buffer. + * @param parentValueCount Number of separate repeating groupings. + * @param childValueCount Number of supported values in the vector. + */ + public void allocateNew(int totalBytes, int parentValueCount, int childValueCount); + + /** + * Provide the maximum amount of variable width bytes that can be stored int his vector. + * @return + */ + public int getByteCapacity(); + + /** + * Load the records in the provided buffer based on the given number of values. + * @param dataBytes The number of bytes associated with the data array. + * @param parentValueCount Number of separate repeating groupings. + * @param childValueCount Number of supported values in the vector. + * @param buf Incoming buffer. + * @return The number of bytes of the buffer that were consumed. + */ + public int load(int dataBytes, int parentValueCount, int childValueCount, DrillBuf buf); +} http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java deleted file mode 100644 index df4279a..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.vector; - -public interface RepeatedVector extends ValueVector { - public static final int DEFAULT_REPEAT_PER_RECORD = 4; - - public RepeatedFixedWidthVector.RepeatedAccessor getAccessor(); - -} http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java index 386ee34..de05131 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java @@ -28,9 +28,9 @@ public class SchemaChangeCallBack implements CallBack { } public boolean getSchemaChange() { - boolean schemaChange = this.schemaChange; - this.schemaChange = false; - return schemaChange; + final boolean current = schemaChange; + schemaChange = false; + return current; } }