Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A5D4F200BA1 for ; Mon, 17 Oct 2016 21:54:55 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A4375160B0F; Mon, 17 Oct 2016 19:54:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 006D2160AF0 for ; Mon, 17 Oct 2016 21:54:52 +0200 (CEST) Received: (qmail 53091 invoked by uid 500); 17 Oct 2016 19:54:52 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 53051 invoked by uid 99); 17 Oct 2016 19:54:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Oct 2016 19:54:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 00B6AE098D; Mon, 17 Oct 2016 19:54:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: prestonc@apache.org To: commits@asterixdb.apache.org Date: Mon, 17 Oct 2016 19:54:53 -0000 Message-Id: <73aaf050547c4cf2a66c84b79d6a7022@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [03/50] [abbrv] asterixdb git commit: snapshot super interval dag. archived-at: Mon, 17 Oct 2016 19:54:55 -0000 snapshot super interval dag. Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/aea7fe87 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/aea7fe87 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/aea7fe87 Branch: refs/heads/ecarm002/interval_join_merge Commit: aea7fe87d3c2d03902a9e207e1afa457733e5171 Parents: 199bddd Author: Preston Carman Authored: Thu Jun 30 16:03:28 2016 -0700 Committer: Preston Carman Committed: Thu Jun 30 16:03:28 2016 -0700 ---------------------------------------------------------------------- .../IntervalLocalRangeSplitterOperator.java | 69 +++ .../physical/AbstractIntervalJoinPOperator.java | 17 + .../physical/IntervalIndexJoinPOperator.java | 1 - .../physical/IntervalJoinPOperator.java | 63 +++ .../IntervalLocalRangeOperatorDescriptor.java | 197 ++++++++ .../IntervalLocalRangeSplitterPOperator.java | 101 +++++ .../IntervalPartitionJoinPOperator.java | 20 + .../asterix/optimizer/base/RuleCollections.java | 2 + .../rules/IntervalSplitPartitioningRule.java | 445 +++++++++++++++++++ ...IntervalPartitionJoinOperatorDescriptor.java | 3 - .../logical/AbstractLogicalOperator.java | 16 +- .../operators/logical/ReplicateOperator.java | 12 +- .../operators/physical/MergeJoinPOperator.java | 16 + .../operators/physical/NLJoinPOperator.java | 295 ------------ .../physical/NestedLoopJoinPOperator.java | 295 ++++++++++++ .../RangePartitionExchangePOperator.java | 8 + .../RangePartitionMergeExchangePOperator.java | 8 + .../operators/physical/ReplicatePOperator.java | 2 +- .../operators/physical/UnionAllPOperator.java | 3 - .../typing/PropagatingTypeEnvironment.java | 4 +- .../algebricks/rewriter/util/JoinUtils.java | 6 +- .../std/misc/SplitOperatorDescriptor.java | 6 +- 22 files changed, 1259 insertions(+), 330 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java new file mode 100644 index 0000000..9ae9f7d --- /dev/null +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.algebra.operators; + +import java.util.Collection; +import java.util.List; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractExtensibleLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorExtension; +import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform; + +public class IntervalLocalRangeSplitterOperator extends AbstractExtensibleLogicalOperator { + + private final List joinKeyLogicalVars; + + public IntervalLocalRangeSplitterOperator(List joinKeyLogicalVars) { + this.joinKeyLogicalVars = joinKeyLogicalVars; + } + + @Override + public boolean isMap() { + return false; + } + + @Override + public IOperatorExtension newInstance() { + return new IntervalLocalRangeSplitterOperator(joinKeyLogicalVars); + } + + @Override + public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) + throws AlgebricksException { + return false; + } + + @Override + public String toString() { + return "IntervalLocalRangeSplitterOperator"; + } + + @Override + public void getUsedVariables(Collection usedVars) { + usedVars.addAll(joinKeyLogicalVars); + } + + @Override + public void getProducedVariables(Collection producedVars) { + // No produced variables. + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java index 146acd5..ca50f1b 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java @@ -67,6 +67,22 @@ public abstract class AbstractIntervalJoinPOperator extends AbstractJoinPOperato this.rangeMap = rangeMap; } + public List getKeysLeftBranch() { + return keysLeftBranch; + } + + public List getKeysRightBranch() { + return keysRightBranch; + } + + public IIntervalMergeJoinCheckerFactory getIntervalMergeJoinCheckerFactory() { + return mjcf; + } + + public IRangeMap getRangeMap() { + return rangeMap; + } + @Override public PhysicalOperatorTag getOperatorTag() { return PhysicalOperatorTag.EXTENSION_OPERATOR; @@ -154,4 +170,5 @@ public abstract class AbstractIntervalJoinPOperator extends AbstractJoinPOperato abstract IOperatorDescriptor getIntervalOperatorDescriptor(int[] keysLeft, int[] keysRight, IOperatorDescriptorRegistry spec, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory mjcf, IRangeMap rangeMap); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalIndexJoinPOperator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalIndexJoinPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalIndexJoinPOperator.java index 97aaafc..32a0f56 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalIndexJoinPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalIndexJoinPOperator.java @@ -59,5 +59,4 @@ public class IntervalIndexJoinPOperator extends AbstractIntervalJoinPOperator { return new IntervalIndexJoinOperatorDescriptor(spec, memSizeInFrames, keysLeft, keysRight, recordDescriptor, mjcf); } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalJoinPOperator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalJoinPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalJoinPOperator.java new file mode 100644 index 0000000..528822d --- /dev/null +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalJoinPOperator.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.algebra.operators.physical; + +import java.util.List; +import java.util.logging.Logger; + +import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinCheckerFactory; +import org.apache.asterix.runtime.operators.joins.intervalindex.IntervalIndexJoinOperatorDescriptor; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind; +import org.apache.hyracks.api.dataflow.IOperatorDescriptor; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; + +public class IntervalJoinPOperator extends AbstractIntervalJoinPOperator { + + private final int memSizeInFrames; + + private static final Logger LOGGER = Logger.getLogger(IntervalJoinPOperator.class.getName()); + + public IntervalJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, + List sideLeftOfEqualities, List sideRightOfEqualities, + int memSizeInFrames, IIntervalMergeJoinCheckerFactory mjcf, IRangeMap rangeMap) { + super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities, mjcf, rangeMap); + this.memSizeInFrames = memSizeInFrames; + + LOGGER.fine("IntervalJoinPOperator constructed with: JoinKind=" + kind + ", JoinPartitioningType=" + + partitioningType + ", List=" + sideLeftOfEqualities + ", List=" + + sideRightOfEqualities + ", int memSizeInFrames=" + memSizeInFrames + + ", IMergeJoinCheckerFactory mjcf=" + mjcf + ", IRangeMap rangeMap=" + rangeMap + "."); + } + + @Override + public String getIntervalJoin() { + return "INTERVAL_JOIN"; + } + + @Override + IOperatorDescriptor getIntervalOperatorDescriptor(int[] keysLeft, int[] keysRight, IOperatorDescriptorRegistry spec, + RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory mjcf, IRangeMap rangeMap) { + return new IntervalIndexJoinOperatorDescriptor(spec, memSizeInFrames, keysLeft, keysRight, recordDescriptor, + mjcf); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java new file mode 100644 index 0000000..392bf43 --- /dev/null +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.algebra.operators.physical; + +import java.nio.ByteBuffer; + +import org.apache.asterix.runtime.operators.joins.IntervalJoinUtil; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.ActivityId; +import org.apache.hyracks.api.dataflow.IActivity; +import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; +import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.data.std.primitive.LongPointable; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; +import org.apache.hyracks.dataflow.std.base.AbstractActivityNode; +import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; +import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable; + +public class IntervalLocalRangeOperatorDescriptor extends AbstractOperatorDescriptor { + private static final long serialVersionUID = 1L; + private static final int PARTITION_ACTIVITY_ID = 0; + + private static final int INPUT_STARTS = 0; + private static final int INPUT_COVERS = 2; + private static final int INPUT_ENDS = 1; + +// private static final int INPUT_STARTS = 0; +// private static final int INPUT_COVERS = 0; +// private static final int INPUT_ENDS = 0; + + private final int key; + private final IRangeMap rangeMap; + + public IntervalLocalRangeOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys, + RecordDescriptor recordDescriptor, IRangeMap rangeMap) { + super(spec, 1, 3); + key = keys[0]; + this.rangeMap = rangeMap; + } + + @Override + public void contributeActivities(IActivityGraphBuilder builder) { + ActivityId aid = new ActivityId(odId, PARTITION_ACTIVITY_ID); + IActivity phase = new PartitionActivityNode(aid); + + builder.addActivity(this, phase); + builder.addSourceEdge(0, phase, 0); + // Connect output + builder.addTargetEdge(0, phase, 0); + builder.addTargetEdge(1, phase, 1); + builder.addTargetEdge(2, phase, 2); + } + + private final class PartitionActivityNode extends AbstractActivityNode { + private static final long serialVersionUID = 1L; + + public PartitionActivityNode(ActivityId id) { + super(id); + } + + @Override + public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) { + return new AbstractUnaryInputOperatorNodePushable() { + private final IFrameWriter[] writers = new IFrameWriter[getOutputArity()]; + private final FrameTupleAppender[] resultAppender = new FrameTupleAppender[getOutputArity()]; + private final RecordDescriptor rd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); + private final FrameTupleAccessor accessor = new FrameTupleAccessor(rd); + private final long nodeRangeStart = getPartitionBoundryStart(); + private final long nodeRangeEnd = getPartitionBoundryEnd(); + + @Override + public void close() throws HyracksDataException { + flush(); + for (int i = 0; i < getOutputArity(); i++) { + writers[i].close(); + } + } + + @Override + public void flush() throws HyracksDataException { + for (int i = 0; i < getOutputArity(); i++) { + resultAppender[i].flush(writers[i]); + } + } + + @Override + public void fail() throws HyracksDataException { + for (int i = 0; i < getOutputArity(); i++) { + writers[i].fail(); + } + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + accessor.reset(buffer); + int tupleCount = accessor.getTupleCount(); + for (int i = 0; i < tupleCount; i++) { + int pid = localPartition(accessor, i, key); + if (pid < outputArity) { + FrameUtils.appendToWriter(writers[pid], resultAppender[pid], accessor, i); + } + } + } + + private int localPartition(FrameTupleAccessor accessor, int i, int key) { + long start = IntervalJoinUtil.getIntervalStart(accessor, i, key); + if (start < nodeRangeStart) { + long end = IntervalJoinUtil.getIntervalEnd(accessor, i, key); + if (end < nodeRangeEnd) { + // Ends + return INPUT_ENDS; + } else { + // Covers (match will all intervals) + return INPUT_COVERS; + } + } else { + // Start (responsible for matches) + return INPUT_STARTS; + } + } + + @Override + public void open() throws HyracksDataException { + for (int i = 0; i < getOutputArity(); i++) { + writers[i].open(); + resultAppender[i] = new FrameTupleAppender(new VSizeFrame(ctx), true); + } + } + + @Override + public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { + writers[index] = writer; + } + + long getPartitionBoundryStart() { + int fieldIndex = 0; + int slot = partition - 1; + long boundary = Long.MIN_VALUE; + // All lookups are on typed values. + if (partition == 0) { + boundary = LongPointable.getLong(rangeMap.getMinByteArray(fieldIndex), + rangeMap.getMinStartOffset(fieldIndex) + 1); + } else if (partition <= rangeMap.getSplitCount()) { + boundary = LongPointable.getLong(rangeMap.getByteArray(fieldIndex, slot), + rangeMap.getStartOffset(fieldIndex, slot) + 1); + } else if (partition > rangeMap.getSplitCount()) { + boundary = LongPointable.getLong(rangeMap.getMaxByteArray(fieldIndex), + rangeMap.getMaxStartOffset(fieldIndex) + 1); + } + return boundary; + } + + long getPartitionBoundryEnd() { + int fieldIndex = 0; + int slot = partition; + long boundary = Long.MAX_VALUE; + // All lookups are on typed values. + if (partition < rangeMap.getSplitCount()) { + boundary = LongPointable.getLong(rangeMap.getByteArray(fieldIndex, slot), + rangeMap.getStartOffset(fieldIndex, slot) + 1); + } else if (partition == rangeMap.getSplitCount()) { + boundary = LongPointable.getLong(rangeMap.getMaxByteArray(fieldIndex), + rangeMap.getMaxStartOffset(fieldIndex) + 1); + } + return boundary; + } + }; + } + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java new file mode 100644 index 0000000..1150b91 --- /dev/null +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.algebra.operators.physical; + +import java.util.List; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator; +import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import org.apache.hyracks.api.dataflow.IOperatorDescriptor; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; + +public class IntervalLocalRangeSplitterPOperator extends AbstractPhysicalOperator { + + private List intervalFields; + private IRangeMap rangeMap; + + public IntervalLocalRangeSplitterPOperator(List intervalFields, IRangeMap rangeMap) { + this.intervalFields = intervalFields; + this.rangeMap = rangeMap; + } + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.EXTENSION_OPERATOR; + } + + @Override + public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { + AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); + deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone(); + } + + @Override + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { + return emptyUnaryRequirements(); + } + + @Override + public String toString() { + return "IntervalLocalRangeSplitterPOperator " + intervalFields; + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + int[] keys = JobGenHelper.variablesToFieldIndexes(intervalFields, inputSchemas[0]); + + IOperatorDescriptorRegistry spec = builder.getJobSpec(); + RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, + context); + + IOperatorDescriptor opDesc = new IntervalLocalRangeOperatorDescriptor(spec, keys, recordDescriptor, rangeMap); + contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc); + // and contribute one edge from its child + ILogicalOperator src = op.getInputs().get(0).getValue(); + builder.contributeGraphEdge(src, 0, op, 0); + } + + @Override + public boolean isMicroOperator() { + return false; + } + + @Override + public boolean expensiveThanMaterialization() { + return false; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java index c52a9cd..414d0b4 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java @@ -62,6 +62,26 @@ public class IntervalPartitionJoinPOperator extends AbstractIntervalJoinPOperato + "."); } + public int getProbeTupleCount() { + return probeTupleCount; + } + + public int getProbeMaxDuration() { + return probeMaxDuration; + } + + public int getBuildTupleCount() { + return buildTupleCount; + } + + public int getBuildMaxDuration() { + return buildMaxDuration; + } + + public int getAvgTuplesInFrame() { + return avgTuplesInFrame; + } + @Override public String getIntervalJoin() { return "INTERVAL_PARTITION_JOIN"; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java index a2f8430..5fcfc94 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java @@ -39,6 +39,7 @@ import org.apache.asterix.optimizer.rules.FeedScanCollectionToUnnest; import org.apache.asterix.optimizer.rules.FuzzyEqRule; import org.apache.asterix.optimizer.rules.IfElseToSwitchCaseFunctionRule; import org.apache.asterix.optimizer.rules.InlineUnnestFunctionRule; +import org.apache.asterix.optimizer.rules.IntervalSplitPartitioningRule; import org.apache.asterix.optimizer.rules.IntroduceAutogenerateIDRule; import org.apache.asterix.optimizer.rules.IntroduceDynamicTypeCastForExternalFunctionRule; import org.apache.asterix.optimizer.rules.IntroduceDynamicTypeCastRule; @@ -332,6 +333,7 @@ public final class RuleCollections { prepareForJobGenRewrites .add(new IsolateHyracksOperatorsRule(HeuristicOptimizer.hyraxOperatorsBelowWhichJobGenIsDisabled)); prepareForJobGenRewrites.add(new ExtractCommonOperatorsRule()); + prepareForJobGenRewrites.add(new IntervalSplitPartitioningRule()); // Re-infer all types, so that, e.g., the effect of not-is-null is // propagated. prepareForJobGenRewrites.add(new ReinferAllTypesRule()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java new file mode 100644 index 0000000..a6f49f9 --- /dev/null +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.optimizer.rules; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.asterix.algebra.operators.IntervalLocalRangeSplitterOperator; +import org.apache.asterix.algebra.operators.physical.IntervalIndexJoinPOperator; +import org.apache.asterix.algebra.operators.physical.IntervalLocalRangeSplitterPOperator; +import org.apache.asterix.algebra.operators.physical.IntervalPartitionJoinPOperator; +import org.apache.asterix.om.functions.AsterixBuiltinFunctions; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.ListSet; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.common.utils.Triple; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.MergeJoinPOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedLoopJoinPOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionExchangePOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.UnionAllPOperator; +import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; +import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; +import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType; + +/** + * Before: + * + *
+ *
+ * Left
+ *
+ *
+ * Right
+ * 
+ * + * After: + * + *
+ *
+ * Left
+ *
+ *
+ * Right
+ * 
+ */ +public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule { + + private static final int LEFT = 0; + private static final int RIGHT = 1; + + private static final int START_SPLITS = 3; + + private static final Set intervalJoinConditions = new HashSet<>(); + static { + intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_AFTER); + intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_BEFORE); + intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_COVERED_BY); + intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_COVERS); + intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_ENDED_BY); + intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_ENDS); + intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_MEETS); + intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_MET_BY); + intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_OVERLAPPED_BY); + intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_OVERLAPPING); + intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_OVERLAPS); + intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_STARTED_BY); + intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_STARTS); + } + + @Override + public boolean rewritePre(Mutable opRef, IOptimizationContext context) + throws AlgebricksException { + return false; + } + + @Override + public boolean rewritePost(Mutable opRef, IOptimizationContext context) + throws AlgebricksException { + ILogicalOperator op = opRef.getValue(); + if (!isIntervalJoin(op)) { + return false; + } + InnerJoinOperator startsJoin = (InnerJoinOperator) op; + ExecutionMode mode = startsJoin.getExecutionMode(); + Mutable startsJoinRef = opRef; + Set localLiveVars = new ListSet<>(); + VariableUtilities.getLiveVariables(op, localLiveVars); + + Mutable leftSortedInput = op.getInputs().get(0); + Mutable rightSortedInput = op.getInputs().get(1); + if (leftSortedInput.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE + && rightSortedInput.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE) { + return false; + } + + Mutable leftSorter = leftSortedInput.getValue().getInputs().get(0); + Mutable rightSorter = rightSortedInput.getValue().getInputs().get(0); + if (leftSorter.getValue().getOperatorTag() != LogicalOperatorTag.ORDER + && rightSorter.getValue().getOperatorTag() != LogicalOperatorTag.ORDER) { + return false; + } + LogicalVariable leftSortKey = getSortKey(leftSorter.getValue()); + LogicalVariable rightSortKey = getSortKey(rightSorter.getValue()); + if (leftSortKey == null || rightSortKey == null) { + return false; + } + + Mutable leftRangeInput = leftSorter.getValue().getInputs().get(0); + Mutable rightRangeInput = rightSorter.getValue().getInputs().get(0); + IRangeMap leftRangeMap = getRangeMapForBranch(leftRangeInput.getValue()); + IRangeMap rightRangeMap = getRangeMapForBranch(rightRangeInput.getValue()); + if (leftRangeMap == null || rightRangeMap == null) { + return false; + } + // TODO check physical join + + // Interval local partition operators + LogicalVariable leftJoinKey = getJoinKey(startsJoin.getCondition().getValue(), LEFT); + LogicalVariable rightJoinKey = getJoinKey(startsJoin.getCondition().getValue(), RIGHT); + if (leftJoinKey == null || rightJoinKey == null) { + return false; + } + ILogicalOperator leftIntervalSplit = getIntervalSplitOperator(leftSortKey, rightRangeMap, mode); + Mutable leftIntervalSplitRef = new MutableObject<>(leftIntervalSplit); + ILogicalOperator rightIntervalSplit = getIntervalSplitOperator(rightSortKey, rightRangeMap, mode); + Mutable rightIntervalSplitRef = new MutableObject<>(rightIntervalSplit); + + // Replicate operators + ReplicateOperator leftStartsSplit = getReplicateOperator(START_SPLITS, mode); + Mutable leftStartsSplitRef = new MutableObject<>(leftStartsSplit); + ReplicateOperator rightStartsSplit = getReplicateOperator(START_SPLITS, mode); + Mutable rightStartsSplitRef = new MutableObject<>(rightStartsSplit); + + // Covers Join Operator + ILogicalOperator leftCoversJoin = getNestedLoop(startsJoin.getCondition(), context, mode); + Mutable leftCoversJoinRef = new MutableObject<>(leftCoversJoin); + ILogicalOperator rightCoversJoin = getNestedLoop(startsJoin.getCondition(), context, mode); + Mutable rightCoversJoinRef = new MutableObject<>(rightCoversJoin); + + // Ends Join Operator + ILogicalOperator leftEndsJoin = getIntervalJoin(startsJoin, context, mode); + ILogicalOperator rightEndsJoin = getIntervalJoin(startsJoin, context, mode); + if (leftEndsJoin == null || rightEndsJoin == null) { + return false; + } + Mutable leftEndsJoinRef = new MutableObject<>(leftEndsJoin); + Mutable rightEndsJoinRef = new MutableObject<>(rightEndsJoin); + + // Union All Operator + ILogicalOperator union1 = getUnionOperator(localLiveVars, mode); + Mutable union1Ref = new MutableObject<>(union1); + ILogicalOperator union2 = getUnionOperator(localLiveVars, mode); + Mutable union2Ref = new MutableObject<>(union2); + ILogicalOperator union3 = getUnionOperator(localLiveVars, mode); + Mutable union3Ref = new MutableObject<>(union3); + ILogicalOperator union4 = getUnionOperator(localLiveVars, mode); + Mutable union4Ref = new MutableObject<>(union4); + + // Connect main path + connectOperators(leftIntervalSplitRef, leftSortedInput, context); + context.computeAndSetTypeEnvironmentForOperator(leftIntervalSplit); + connectOperators(leftStartsSplitRef, leftIntervalSplitRef, context); + context.computeAndSetTypeEnvironmentForOperator(leftStartsSplit); + connectOperators(rightIntervalSplitRef, rightSortedInput, context); + context.computeAndSetTypeEnvironmentForOperator(rightIntervalSplit); + connectOperators(rightStartsSplitRef, rightIntervalSplitRef, context); + context.computeAndSetTypeEnvironmentForOperator(rightStartsSplit); + updateConnections(startsJoinRef, leftStartsSplitRef, context, LEFT); + updateConnections(startsJoinRef, rightStartsSplitRef, context, RIGHT); + context.computeAndSetTypeEnvironmentForOperator(startsJoin); + leftStartsSplit.getOutputs().add(startsJoinRef); + rightStartsSplit.getOutputs().add(startsJoinRef); + + // Connect left ends path + connectOperators(leftEndsJoinRef, leftIntervalSplitRef, context); + connectOperators(leftEndsJoinRef, rightStartsSplitRef, context); + context.computeAndSetTypeEnvironmentForOperator(leftEndsJoin); + connectOperators(union1Ref, leftEndsJoinRef, context); + connectOperators(union1Ref, startsJoinRef, context); + context.computeAndSetTypeEnvironmentForOperator(union1); + rightStartsSplit.getOutputs().add(leftEndsJoinRef); + + // Connect left covers path + connectOperators(leftCoversJoinRef, leftIntervalSplitRef, context); + connectOperators(leftCoversJoinRef, rightStartsSplitRef, context); + context.computeAndSetTypeEnvironmentForOperator(leftCoversJoin); + connectOperators(union2Ref, union1Ref, context); + connectOperators(union2Ref, leftCoversJoinRef, context); + context.computeAndSetTypeEnvironmentForOperator(union2); + rightStartsSplit.getOutputs().add(leftCoversJoinRef); + + // Connect right ends path + connectOperators(rightEndsJoinRef, leftStartsSplitRef, context); + connectOperators(rightEndsJoinRef, rightIntervalSplitRef, context); + context.computeAndSetTypeEnvironmentForOperator(rightEndsJoin); + connectOperators(union3Ref, union2Ref, context); + connectOperators(union3Ref, rightEndsJoinRef, context); + context.computeAndSetTypeEnvironmentForOperator(union3); + leftStartsSplit.getOutputs().add(rightEndsJoinRef); + + // Connect right covers path + connectOperators(rightCoversJoinRef, leftStartsSplitRef, context); + connectOperators(rightCoversJoinRef, rightIntervalSplitRef, context); + context.computeAndSetTypeEnvironmentForOperator(rightCoversJoin); + connectOperators(union4Ref, union3Ref, context); + connectOperators(union4Ref, rightCoversJoinRef, context); + context.computeAndSetTypeEnvironmentForOperator(union4); + leftStartsSplit.getOutputs().add(rightCoversJoinRef); + + // Update context + opRef.setValue(union4); + return true; + } + + private LogicalVariable getSortKey(ILogicalOperator op) { + if (op.getOperatorTag() != LogicalOperatorTag.ORDER) { + return null; + } + OrderOperator oo = (OrderOperator) op; + List>> order = oo.getOrderExpressions(); + Mutable sortLe = order.get(0).second; + if (sortLe.getValue().getExpressionTag() == LogicalExpressionTag.VARIABLE) { + return ((VariableReferenceExpression) sortLe.getValue()).getVariableReference(); + } + return null; + } + + private LogicalVariable getJoinKey(ILogicalExpression expr, int branch) { + if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) { + return null; + } + // Check whether the function is a function we want to push. + AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr; + if (!intervalJoinConditions.contains(funcExpr.getFunctionIdentifier())) { + return null; + } + ILogicalExpression funcArg = funcExpr.getArguments().get(branch).getValue(); + if (funcArg instanceof VariableReferenceExpression) { + return ((VariableReferenceExpression) funcArg).getVariableReference(); + } + return null; + } + + private void connectOperators(Mutable from, Mutable to, + IOptimizationContext context) throws AlgebricksException { + if (to.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE) { + ILogicalOperator eo = getExchangeOperator(from.getValue().getExecutionMode()); + Mutable eoRef = new MutableObject<>(eo); + eo.getInputs().add(to); + from.getValue().getInputs().add(eoRef); + context.computeAndSetTypeEnvironmentForOperator(eo); + context.computeAndSetTypeEnvironmentForOperator(from.getValue()); + } else { + from.getValue().getInputs().add(to); + context.computeAndSetTypeEnvironmentForOperator(from.getValue()); + } + } + + private void updateConnections(Mutable from, Mutable to, + IOptimizationContext context, int index) throws AlgebricksException { + if (from.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE) { + ILogicalOperator eo = getExchangeOperator(from.getValue().getExecutionMode()); + Mutable eoRef = new MutableObject<>(eo); + eo.getInputs().add(to); + from.getValue().getInputs().set(index, eoRef); + context.computeAndSetTypeEnvironmentForOperator(from.getValue()); + context.computeAndSetTypeEnvironmentForOperator(eo); + } else { + from.getValue().getInputs().set(index, to); + context.computeAndSetTypeEnvironmentForOperator(from.getValue()); + } + } + + private ILogicalOperator getExchangeOperator(ExecutionMode mode) { + ExchangeOperator eo = new ExchangeOperator(); + eo.setPhysicalOperator(new OneToOneExchangePOperator()); + eo.setExecutionMode(mode); + return eo; + } + + private ILogicalOperator getIntervalSplitOperator(LogicalVariable key, IRangeMap rangeMap, ExecutionMode mode) { + List joinKeyLogicalVars = new ArrayList<>(); + joinKeyLogicalVars.add(key); + //create the logical and physical operator + IntervalLocalRangeSplitterOperator splitOperator = new IntervalLocalRangeSplitterOperator(joinKeyLogicalVars); + IntervalLocalRangeSplitterPOperator splitPOperator = new IntervalLocalRangeSplitterPOperator(joinKeyLogicalVars, + rangeMap); + splitOperator.setPhysicalOperator(splitPOperator); + splitOperator.setExecutionMode(mode); + + //create ExtensionOperator and put the commitOperator in it. + ExtensionOperator extensionOperator = new ExtensionOperator(splitOperator); + extensionOperator.setPhysicalOperator(splitPOperator); + extensionOperator.setExecutionMode(mode); + return extensionOperator; + } + + private ReplicateOperator getReplicateOperator(int outputArity, ExecutionMode mode) { + boolean[] flags = new boolean[outputArity]; + ReplicateOperator ro = new ReplicateOperator(flags.length, flags); + ReplicatePOperator rpo = new ReplicatePOperator(); + ro.setPhysicalOperator(rpo); + ro.setExecutionMode(mode); + return ro; + } + + private ILogicalOperator getNestedLoop(Mutable condition, IOptimizationContext context, + ExecutionMode mode) { + int memoryJoinSize = context.getPhysicalOptimizationConfig().getMaxFramesForJoin(); + InnerJoinOperator ijo = new InnerJoinOperator(condition); + NestedLoopJoinPOperator nljpo = new NestedLoopJoinPOperator(JoinKind.INNER, JoinPartitioningType.BROADCAST, + memoryJoinSize); + ijo.setPhysicalOperator(nljpo); + ijo.setExecutionMode(mode); + return ijo; + } + + private ILogicalOperator getIntervalJoin(ILogicalOperator op, IOptimizationContext context, ExecutionMode mode) { + if (op.getOperatorTag() != LogicalOperatorTag.INNERJOIN) { + return null; + } + InnerJoinOperator ijo = (InnerJoinOperator) op; + InnerJoinOperator ijoClone = new InnerJoinOperator(ijo.getCondition()); + + int memoryJoinSize = context.getPhysicalOptimizationConfig().getMaxFramesForJoin(); + IPhysicalOperator joinPo = ijo.getPhysicalOperator(); + if (joinPo.getOperatorTag() == PhysicalOperatorTag.MERGE_JOIN) { + MergeJoinPOperator mjpo = (MergeJoinPOperator) joinPo; + MergeJoinPOperator mjpoClone = new MergeJoinPOperator(mjpo.getKind(), mjpo.getPartitioningType(), + mjpo.getKeysLeftBranch(), mjpo.getKeysRightBranch(), memoryJoinSize, + mjpo.getMergeJoinCheckerFactory(), mjpo.getRangeMap()); + ijoClone.setPhysicalOperator(mjpoClone); + } else if (joinPo.getOperatorTag() == PhysicalOperatorTag.EXTENSION_OPERATOR) { + if (joinPo instanceof IntervalIndexJoinPOperator) { + IntervalIndexJoinPOperator iijpo = (IntervalIndexJoinPOperator) joinPo; + IntervalIndexJoinPOperator iijpoClone = new IntervalIndexJoinPOperator(iijpo.getKind(), + iijpo.getPartitioningType(), iijpo.getKeysLeftBranch(), iijpo.getKeysRightBranch(), + memoryJoinSize, iijpo.getIntervalMergeJoinCheckerFactory(), iijpo.getRangeMap()); + ijoClone.setPhysicalOperator(iijpoClone); + } else if (joinPo instanceof IntervalPartitionJoinPOperator) { + IntervalPartitionJoinPOperator ipjpo = (IntervalPartitionJoinPOperator) joinPo; + IntervalPartitionJoinPOperator iijpoClone = new IntervalPartitionJoinPOperator(ipjpo.getKind(), + ipjpo.getPartitioningType(), ipjpo.getKeysLeftBranch(), ipjpo.getKeysRightBranch(), + memoryJoinSize, ipjpo.getBuildTupleCount(), ipjpo.getProbeTupleCount(), + ipjpo.getBuildMaxDuration(), ipjpo.getProbeMaxDuration(), ipjpo.getAvgTuplesInFrame(), + ipjpo.getIntervalMergeJoinCheckerFactory(), ipjpo.getRangeMap()); + ijoClone.setPhysicalOperator(iijpoClone); + } else { + return null; + } + } else { + return null; + } + ijoClone.setExecutionMode(mode); + return ijoClone; + } + + private ILogicalOperator getUnionOperator(Set localLiveVars, ExecutionMode mode) { + List> varMap = new ArrayList<>(); + for (LogicalVariable lv : localLiveVars) { + varMap.add(new Triple(lv, lv, lv)); + } + UnionAllOperator uao = new UnionAllOperator(varMap); + uao.setPhysicalOperator(new UnionAllPOperator()); + uao.setExecutionMode(mode); + return uao; + } + + private boolean isIntervalJoin(ILogicalOperator op) { + if (op.getOperatorTag() != LogicalOperatorTag.INNERJOIN) { + return false; + } + // TODO add check for condition. + InnerJoinOperator ijo = (InnerJoinOperator) op; + if (ijo.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.MERGE_JOIN) { + return true; + } + if (ijo.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.EXTENSION_OPERATOR) { + return true; + } + return false; + } + + private IRangeMap getRangeMapForBranch(ILogicalOperator op) { + if (op.getOperatorTag() != LogicalOperatorTag.EXCHANGE) { + return null; + } + ExchangeOperator exchangeLeft = (ExchangeOperator) op; + if (exchangeLeft.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.RANGE_PARTITION_EXCHANGE) { + return null; + } + RangePartitionExchangePOperator exchangeLeftPO = (RangePartitionExchangePOperator) exchangeLeft + .getPhysicalOperator(); + if (exchangeLeftPO.getRangeType() != RangePartitioningType.SPLIT) { + return null; + } + return exchangeLeftPO.getRangeMap(); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java index 0d8d7bf..21e07a5 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java @@ -19,9 +19,6 @@ package org.apache.asterix.runtime.operators.joins.intervalpartition; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.logging.Level; import java.util.logging.Logger; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java index 1a7e224..da85612 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java @@ -52,7 +52,7 @@ public abstract class AbstractLogicalOperator implements ILogicalOperator { * partition and only processes data from that partition */ - public static enum ExecutionMode { + public enum ExecutionMode { UNPARTITIONED, PARTITIONED, LOCAL @@ -60,15 +60,14 @@ public abstract class AbstractLogicalOperator implements ILogicalOperator { private AbstractLogicalOperator.ExecutionMode mode = AbstractLogicalOperator.ExecutionMode.UNPARTITIONED; protected IPhysicalOperator physicalOperator; - private final Map annotations = new HashMap(); + private final Map annotations = new HashMap<>(); private boolean bJobGenEnabled = true; - final protected List> inputs; - // protected List outputs; + protected final List> inputs; protected List schema; public AbstractLogicalOperator() { - inputs = new ArrayList>(); + inputs = new ArrayList<>(); } @Override @@ -134,11 +133,6 @@ public abstract class AbstractLogicalOperator implements ILogicalOperator { return inputs; } - // @Override - // public final List getOutputs() { - // return outputs; - // } - @Override public final boolean hasInputs() { return !inputs.isEmpty(); @@ -161,7 +155,7 @@ public abstract class AbstractLogicalOperator implements ILogicalOperator { @Override public final void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) - throws AlgebricksException { + throws AlgebricksException { if (bJobGenEnabled) { if (physicalOperator == null) { throw new AlgebricksException("Physical operator not set for operator: " + this); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java index 343ace8..834107c 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java @@ -34,20 +34,20 @@ import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisit public class ReplicateOperator extends AbstractLogicalOperator { - private int outputArity = 2; - private boolean[] outputMaterializationFlags = new boolean[outputArity]; + private int outputArity; + private boolean[] outputMaterializationFlags; private List> outputs; public ReplicateOperator(int outputArity) { this.outputArity = outputArity; this.outputMaterializationFlags = new boolean[outputArity]; - this.outputs = new ArrayList>(); + this.outputs = new ArrayList<>(); } public ReplicateOperator(int outputArity, boolean[] outputMaterializationFlags) { this.outputArity = outputArity; this.outputMaterializationFlags = outputMaterializationFlags; - this.outputs = new ArrayList>(); + this.outputs = new ArrayList<>(); } @Override @@ -89,10 +89,6 @@ public class ReplicateOperator extends AbstractLogicalOperator { return outputArity; } - public int setOutputArity(int outputArity) { - return this.outputArity = outputArity; - } - public void setOutputMaterializationFlags(boolean[] outputMaterializationFlags) { this.outputMaterializationFlags = outputMaterializationFlags; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java index 04c34b8..51f54f6 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java @@ -75,6 +75,22 @@ public class MergeJoinPOperator extends AbstractJoinPOperator { + mjcf + ", IRangeMap rangeMap=" + rangeMap + "."); } + public List getKeysLeftBranch() { + return keysLeftBranch; + } + + public List getKeysRightBranch() { + return keysRightBranch; + } + + public IMergeJoinCheckerFactory getMergeJoinCheckerFactory() { + return mjcf; + } + + public IRangeMap getRangeMap() { + return rangeMap; + } + @Override public PhysicalOperatorTag getOperatorTag() { return PhysicalOperatorTag.MERGE_JOIN; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java deleted file mode 100644 index 5384347..0000000 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java +++ /dev/null @@ -1,295 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.algebricks.core.algebra.operators.physical; - -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; -import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; -import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; -import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; -import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; -import org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty; -import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; -import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; -import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; -import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; -import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; -import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; -import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; -import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector; -import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory; -import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; -import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; -import org.apache.hyracks.api.comm.IFrameTupleAccessor; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.IOperatorDescriptor; -import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; -import org.apache.hyracks.api.dataflow.value.ITuplePairComparator; -import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.data.std.api.IPointable; -import org.apache.hyracks.data.std.primitive.VoidPointable; -import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; -import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; -import org.apache.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor; - -/** - * The right input is broadcast and the left input can be partitioned in any way. - */ -public class NLJoinPOperator extends AbstractJoinPOperator { - - private final int memSize; - - public NLJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, int memSize) { - super(kind, partitioningType); - this.memSize = memSize; - } - - @Override - public PhysicalOperatorTag getOperatorTag() { - return PhysicalOperatorTag.NESTED_LOOP; - } - - @Override - public boolean isMicroOperator() { - return false; - } - - @Override - public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context) { - if (partitioningType != JoinPartitioningType.BROADCAST) { - throw new NotImplementedException(partitioningType + " nested loop joins are not implemented."); - } - - IPartitioningProperty pp; - - AbstractLogicalOperator op = (AbstractLogicalOperator) iop; - - if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) { - AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(1).getValue(); - IPhysicalPropertiesVector pv1 = op2.getPhysicalOperator().getDeliveredProperties(); - if (pv1 == null) { - pp = null; - } else { - pp = pv1.getPartitioningProperty(); - } - } else { - pp = IPartitioningProperty.UNPARTITIONED; - } - - // Nested loop join cannot maintain the local structure property for the probe side - // because of the I/O optimization for the build branch. - this.deliveredProperties = new StructuralPropertiesVector(pp, null); - } - - @Override - public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { - if (partitioningType != JoinPartitioningType.BROADCAST) { - throw new NotImplementedException(partitioningType + " nested loop joins are not implemented."); - } - - StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2]; - - // TODO: leverage statistics to make better decisions. - pv[0] = new StructuralPropertiesVector(null, null); - pv[1] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(context.getComputationNodeDomain()), - null); - return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION); - } - - @Override - public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, - IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) - throws AlgebricksException { - AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op; - RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), - propagatedSchema, context); - IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1]; - conditionInputSchemas[0] = propagatedSchema; - IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider(); - IScalarEvaluatorFactory cond = expressionRuntimeProvider.createEvaluatorFactory(join.getCondition().getValue(), - context.getTypeEnvironment(op), conditionInputSchemas, context); - ITuplePairComparatorFactory comparatorFactory = new TuplePairEvaluatorFactory(cond, - context.getBinaryBooleanInspectorFactory()); - IOperatorDescriptorRegistry spec = builder.getJobSpec(); - IOperatorDescriptor opDesc = null; - - switch (kind) { - case INNER: { - opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, false, - null); - break; - } - case LEFT_OUTER: { - IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[inputSchemas[1].getSize()]; - for (int j = 0; j < nonMatchWriterFactories.length; j++) { - nonMatchWriterFactories[j] = context.getMissingWriterFactory(); - } - opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, true, - nonMatchWriterFactories); - break; - } - default: { - throw new NotImplementedException(); - } - } - contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc); - - ILogicalOperator src1 = op.getInputs().get(0).getValue(); - builder.contributeGraphEdge(src1, 0, op, 0); - ILogicalOperator src2 = op.getInputs().get(1).getValue(); - builder.contributeGraphEdge(src2, 0, op, 1); - } - - public static class TuplePairEvaluatorFactory implements ITuplePairComparatorFactory { - - private static final long serialVersionUID = 1L; - private final IScalarEvaluatorFactory cond; - private final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory; - - public TuplePairEvaluatorFactory(IScalarEvaluatorFactory cond, - IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) { - this.cond = cond; - this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory; - } - - @Override - public synchronized ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) { - return new TuplePairEvaluator(ctx, cond, binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx)); - } - } - - public static class TuplePairEvaluator implements ITuplePairComparator { - private final IHyracksTaskContext ctx; - private IScalarEvaluator condEvaluator; - private final IScalarEvaluatorFactory condFactory; - private final IPointable p; - private final CompositeFrameTupleReference compositeTupleRef; - private final FrameTupleReference leftRef; - private final FrameTupleReference rightRef; - private final IBinaryBooleanInspector binaryBooleanInspector; - - public TuplePairEvaluator(IHyracksTaskContext ctx, IScalarEvaluatorFactory condFactory, - IBinaryBooleanInspector binaryBooleanInspector) { - this.ctx = ctx; - this.condFactory = condFactory; - this.binaryBooleanInspector = binaryBooleanInspector; - this.leftRef = new FrameTupleReference(); - this.p = VoidPointable.FACTORY.createPointable(); - this.rightRef = new FrameTupleReference(); - this.compositeTupleRef = new CompositeFrameTupleReference(leftRef, rightRef); - } - - @Override - public int compare(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor, - int innerIndex) throws HyracksDataException { - if (condEvaluator == null) { - try { - this.condEvaluator = condFactory.createScalarEvaluator(ctx); - } catch (AlgebricksException ae) { - throw new HyracksDataException(ae); - } - } - compositeTupleRef.reset(outerAccessor, outerIndex, innerAccessor, innerIndex); - try { - condEvaluator.evaluate(compositeTupleRef, p); - } catch (AlgebricksException ae) { - throw new HyracksDataException(ae); - } - boolean result = binaryBooleanInspector.getBooleanValue(p.getByteArray(), p.getStartOffset(), - p.getLength()); - if (result) { - return 0; - } else { - return 1; - } - } - } - - public static class CompositeFrameTupleReference implements IFrameTupleReference { - - private final FrameTupleReference refLeft; - private final FrameTupleReference refRight; - - public CompositeFrameTupleReference(FrameTupleReference refLeft, FrameTupleReference refRight) { - this.refLeft = refLeft; - this.refRight = refRight; - } - - public void reset(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor, - int innerIndex) { - refLeft.reset(outerAccessor, outerIndex); - refRight.reset(innerAccessor, innerIndex); - } - - @Override - public int getFieldCount() { - return refLeft.getFieldCount() + refRight.getFieldCount(); - } - - @Override - public byte[] getFieldData(int fIdx) { - int leftFieldCount = refLeft.getFieldCount(); - if (fIdx < leftFieldCount) { - return refLeft.getFieldData(fIdx); - } else { - return refRight.getFieldData(fIdx - leftFieldCount); - } - } - - @Override - public int getFieldStart(int fIdx) { - int leftFieldCount = refLeft.getFieldCount(); - if (fIdx < leftFieldCount) { - return refLeft.getFieldStart(fIdx); - } else { - return refRight.getFieldStart(fIdx - leftFieldCount); - } - } - - @Override - public int getFieldLength(int fIdx) { - int leftFieldCount = refLeft.getFieldCount(); - if (fIdx < leftFieldCount) { - return refLeft.getFieldLength(fIdx); - } else { - return refRight.getFieldLength(fIdx - leftFieldCount); - } - } - - @Override - public IFrameTupleAccessor getFrameTupleAccessor() { - throw new NotImplementedException(); - } - - @Override - public int getTupleIndex() { - throw new NotImplementedException(); - } - - } -}