Repository: asterixdb
Updated Branches:
refs/heads/master 3cb0b7835 -> 4b30d7901
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4b30d790/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index 974a079..688e819 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -108,10 +108,11 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati
byte[] data = tb.getByteArray();
int[] fieldEnds = tb.getFieldEndOffsets();
int start = 0;
- int offset = 0;
+ int offset;
for (int i = 0; i < fieldEnds.length; i++) {
- if (i > 0)
+ if (i > 0) {
start = fieldEnds[i - 1];
+ }
offset = fieldEnds[i] - start;
tupleBuilder.addField(data, start, offset);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4b30d790/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
index 8df0002..b45879c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
@@ -29,20 +29,28 @@ import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescri
import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
public class PreclusteredGroupOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor
{
+ private static final long serialVersionUID = 1L;
+
private final int[] groupFields;
private final IBinaryComparatorFactory[] comparatorFactories;
private final IAggregatorDescriptorFactory aggregatorFactory;
-
- private static final long serialVersionUID = 1L;
+ private final boolean groupAll;
public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] groupFields,
IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory
aggregatorFactory,
RecordDescriptor recordDescriptor) {
+ this(spec, groupFields, comparatorFactories, aggregatorFactory, recordDescriptor,
false);
+ }
+
+ public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] groupFields,
+ IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory
aggregatorFactory,
+ RecordDescriptor recordDescriptor, boolean groupAll) {
super(spec, 1, 1);
this.groupFields = groupFields;
this.comparatorFactories = comparatorFactories;
this.aggregatorFactory = aggregatorFactory;
recordDescriptors[0] = recordDescriptor;
+ this.groupAll = groupAll;
}
@Override
@@ -50,6 +58,6 @@ public class PreclusteredGroupOperatorDescriptor extends AbstractSingleActivityO
final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
throws HyracksDataException {
return new PreclusteredGroupOperatorNodePushable(ctx, groupFields, comparatorFactories,
aggregatorFactory,
- recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), recordDescriptors[0]);
+ recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), recordDescriptors[0],
groupAll);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4b30d790/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
index 3286703..2acc4db 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
@@ -35,17 +35,20 @@ class PreclusteredGroupOperatorNodePushable extends AbstractUnaryInputUnaryOutpu
private final IAggregatorDescriptorFactory aggregatorFactory;
private final RecordDescriptor inRecordDescriptor;
private final RecordDescriptor outRecordDescriptor;
+ private final boolean groupAll;
+
private PreclusteredGroupWriter pgw;
PreclusteredGroupOperatorNodePushable(IHyracksTaskContext ctx, int[] groupFields,
IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory
aggregatorFactory,
- RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor) {
+ RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, boolean
groupAll) {
this.ctx = ctx;
this.groupFields = groupFields;
this.comparatorFactories = comparatorFactories;
this.aggregatorFactory = aggregatorFactory;
this.inRecordDescriptor = inRecordDescriptor;
this.outRecordDescriptor = outRecordDescriptor;
+ this.groupAll = groupAll;
}
@Override
@@ -55,7 +58,7 @@ class PreclusteredGroupOperatorNodePushable extends AbstractUnaryInputUnaryOutpu
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory,
inRecordDescriptor,
- outRecordDescriptor, writer);
+ outRecordDescriptor, writer, false, groupAll);
pgw.open();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4b30d790/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index b4e51be..7901141 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -47,22 +47,28 @@ public class PreclusteredGroupWriter implements IFrameWriter {
private final FrameTupleAppenderWrapper appenderWrapper;
private final ArrayTupleBuilder tupleBuilder;
- private boolean outputPartial = false;
-
+ private final boolean groupAll;
+ private final boolean outputPartial;
private boolean first;
-
private boolean isFailed = false;
public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[]
comparators,
IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
+ RecordDescriptor outRecordDesc, IFrameWriter writer) throws HyracksDataException
{
+ this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, outRecordDesc,
writer, false, false);
+ }
+
+ public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[]
comparators,
+ IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
RecordDescriptor outRecordDesc, IFrameWriter writer, boolean outputPartial) throws
HyracksDataException {
- this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, outRecordDesc,
writer);
- this.outputPartial = outputPartial;
+ this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, outRecordDesc,
writer, outputPartial,
+ false);
}
public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[]
comparators,
IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
- RecordDescriptor outRecordDesc, IFrameWriter writer) throws HyracksDataException
{
+ RecordDescriptor outRecordDesc, IFrameWriter writer, boolean outputPartial, boolean
groupAll)
+ throws HyracksDataException {
this.groupFields = groupFields;
this.comparators = comparators;
this.aggregator =
@@ -79,6 +85,8 @@ public class PreclusteredGroupWriter implements IFrameWriter {
appenderWrapper = new FrameTupleAppenderWrapper(appender, writer);
tupleBuilder = new ArrayTupleBuilder(outRecordDesc.getFields().length);
+ this.outputPartial = outputPartial;
+ this.groupAll = groupAll;
}
@Override
@@ -176,8 +184,7 @@ public class PreclusteredGroupWriter implements IFrameWriter {
@Override
public void close() throws HyracksDataException {
try {
- if (!isFailed && !first) {
- assert (copyFrameAccessor.getTupleCount() > 0);
+ if (!isFailed && (!first || groupAll)) {
writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
appenderWrapper.write();
}
|