Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E7E0C200BA1 for ; Mon, 17 Oct 2016 21:54:56 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E6816160AF0; Mon, 17 Oct 2016 19:54:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C067B160B0B for ; Mon, 17 Oct 2016 21:54:53 +0200 (CEST) Received: (qmail 54368 invoked by uid 500); 17 Oct 2016 19:54:52 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 53484 invoked by uid 99); 17 Oct 2016 19:54:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Oct 2016 19:54:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 555C2F1593; Mon, 17 Oct 2016 19:54:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: prestonc@apache.org To: commits@asterixdb.apache.org Date: Mon, 17 Oct 2016 19:55:13 -0000 Message-Id: <1a728a528bf0445e80ea66b56883e272@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [23/50] [abbrv] asterixdb git commit: Finalize the range in context. archived-at: Mon, 17 Oct 2016 19:54:57 -0000 Finalize the range in context. Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/13af53a7 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/13af53a7 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/13af53a7 Branch: refs/heads/ecarm002/interval_join_merge Commit: 13af53a7b26c421e58848f6614b2cd0534cd72f4 Parents: 2061a38 Author: Preston Carman Authored: Mon Aug 15 14:30:00 2016 -0700 Committer: Preston Carman Committed: Mon Aug 15 14:30:00 2016 -0700 ---------------------------------------------------------------------- .../physical/AbstractIntervalJoinPOperator.java | 19 +++++++++++---- .../IntervalLocalRangeOperatorDescriptor.java | 2 +- .../rules/IntervalSplitPartitioningRule.java | 8 ++++--- .../asterix/optimizer/rules/util/JoinUtils.java | 25 +++++++++++--------- .../AfterIntervalMergeJoinCheckerFactory.java | 4 ++-- .../BeforeIntervalMergeJoinCheckerFactory.java | 4 ++-- ...overedByIntervalMergeJoinCheckerFactory.java | 4 ++-- .../CoversIntervalMergeJoinCheckerFactory.java | 4 ++-- .../EndedByIntervalMergeJoinCheckerFactory.java | 4 ++-- .../EndsIntervalMergeJoinCheckerFactory.java | 4 ++-- .../joins/IIntervalMergeJoinCheckerFactory.java | 4 ++-- .../MeetsIntervalMergeJoinCheckerFactory.java | 4 ++-- .../MetByIntervalMergeJoinCheckerFactory.java | 4 ++-- ...lappedByIntervalMergeJoinCheckerFactory.java | 4 ++-- ...rlappingIntervalMergeJoinCheckerFactory.java | 15 ++++++++++-- ...OverlapsIntervalMergeJoinCheckerFactory.java | 4 ++-- ...tartedByIntervalMergeJoinCheckerFactory.java | 4 ++-- .../StartsIntervalMergeJoinCheckerFactory.java | 4 ++-- .../intervalindex/IntervalIndexJoiner.java | 2 +- ...IntervalPartitionJoinOperatorDescriptor.java | 16 +++++++------ .../operators/physical/MergeJoinPOperator.java | 15 +++++++++--- .../properties/OrderedPartitionedProperty.java | 6 +++++ .../algebra/util/OperatorPropertiesUtil.java | 15 +++++++----- .../rules/EnforceStructuralPropertiesRule.java | 4 +--- .../hyracks/dataflow/std/base/RangeId.java | 25 ++++++++++++++++++-- .../connectors/PartitionRangeDataWriter.java | 2 +- .../std/join/IMergeJoinCheckerFactory.java | 4 ++-- .../std/join/MergeJoinOperatorDescriptor.java | 2 +- .../join/NaturalMergeJoinCheckerFactory.java | 4 ++-- .../misc/RangeForwardOperatorDescriptor.java | 3 ++- .../std/sort/AbstractExternalSortRunMerger.java | 24 +++++++++---------- 31 files changed, 155 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java index c400cdf..3be9e80 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java @@ -84,10 +84,18 @@ public abstract class AbstractIntervalJoinPOperator extends AbstractJoinPOperato return mjcf; } - public RangeId getRangeId() { + public RangeId getLeftRangeId() { return leftRangeId; } + public RangeId getRightRangeId() { + return rightRangeId; + } + + public IRangeMap getRangeMapHint() { + return rangeMapHint; + } + @Override public PhysicalOperatorTag getOperatorTag() { return PhysicalOperatorTag.EXTENSION_OPERATOR; @@ -111,7 +119,8 @@ public abstract class AbstractIntervalJoinPOperator extends AbstractJoinPOperato for (LogicalVariable v : keysLeftBranch) { order.add(new OrderColumn(v, mjcf.isOrderAsc() ? OrderKind.ASC : OrderKind.DESC)); } - IPartitioningProperty pp = new OrderedPartitionedProperty(order, null, leftRangeId, RangePartitioningType.PROJECT, rangeMapHint); + IPartitioningProperty pp = new OrderedPartitionedProperty(order, null, leftRangeId, + RangePartitioningType.PROJECT, rangeMapHint); List propsLocal = new ArrayList<>(); propsLocal.add(new LocalOrderProperty(order)); deliveredProperties = new StructuralPropertiesVector(pp, propsLocal); @@ -141,8 +150,10 @@ public abstract class AbstractIntervalJoinPOperator extends AbstractJoinPOperato ispRight.add(new LocalOrderProperty(orderRight)); if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) { - ppLeft = new OrderedPartitionedProperty(orderLeft, null, leftRangeId, mjcf.getLeftPartitioningType(), rangeMapHint); - ppRight = new OrderedPartitionedProperty(orderRight, null, rightRangeId, mjcf.getRightPartitioningType(), rangeMapHint); + ppLeft = new OrderedPartitionedProperty(orderLeft, null, leftRangeId, mjcf.getLeftPartitioningType(), + rangeMapHint); + ppRight = new OrderedPartitionedProperty(orderRight, null, rightRangeId, mjcf.getRightPartitioningType(), + rangeMapHint); } pv[0] = new StructuralPropertiesVector(ppLeft, ispLeft); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java index 584e30f..f24dc7c 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java @@ -154,7 +154,7 @@ public class IntervalLocalRangeOperatorDescriptor extends AbstractOperatorDescri writers[i].open(); resultAppender[i] = new FrameTupleAppender(new VSizeFrame(ctx), true); } - RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(rangeId); + RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(new RangeId(rangeId.getId(), ctx)); IRangeMap rangeMap = rangeState.getRangeMap(); nodeRangeStart = getPartitionBoundryStart(rangeMap); nodeRangeEnd = getPartitionBoundryEnd(rangeMap); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java index eafc2bb..f0ff610 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java @@ -418,14 +418,15 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule { MergeJoinPOperator mjpo = (MergeJoinPOperator) joinPo; MergeJoinPOperator mjpoClone = new MergeJoinPOperator(mjpo.getKind(), mjpo.getPartitioningType(), mjpo.getKeysLeftBranch(), mjpo.getKeysRightBranch(), memoryJoinSize, - mjpo.getMergeJoinCheckerFactory(), mjpo.getRangeId(), null); + mjpo.getMergeJoinCheckerFactory(), mjpo.getLeftRangeId(), mjpo.getRightRangeId(), null); ijoClone.setPhysicalOperator(mjpoClone); } else if (joinPo.getOperatorTag() == PhysicalOperatorTag.EXTENSION_OPERATOR) { if (joinPo instanceof IntervalIndexJoinPOperator) { IntervalIndexJoinPOperator iijpo = (IntervalIndexJoinPOperator) joinPo; IntervalIndexJoinPOperator iijpoClone = new IntervalIndexJoinPOperator(iijpo.getKind(), iijpo.getPartitioningType(), iijpo.getKeysLeftBranch(), iijpo.getKeysRightBranch(), - memoryJoinSize, iijpo.getIntervalMergeJoinCheckerFactory(), iijpo.getRangeId(), null); + memoryJoinSize, iijpo.getIntervalMergeJoinCheckerFactory(), iijpo.getLeftRangeId(), + iijpo.getRightRangeId(), null); ijoClone.setPhysicalOperator(iijpoClone); } else if (joinPo instanceof IntervalPartitionJoinPOperator) { IntervalPartitionJoinPOperator ipjpo = (IntervalPartitionJoinPOperator) joinPo; @@ -433,7 +434,8 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule { ipjpo.getPartitioningType(), ipjpo.getKeysLeftBranch(), ipjpo.getKeysRightBranch(), memoryJoinSize, ipjpo.getBuildTupleCount(), ipjpo.getProbeTupleCount(), ipjpo.getBuildMaxDuration(), ipjpo.getProbeMaxDuration(), ipjpo.getAvgTuplesInFrame(), - ipjpo.getIntervalMergeJoinCheckerFactory(), ipjpo.getRangeId(), null); + ipjpo.getIntervalMergeJoinCheckerFactory(), ipjpo.getLeftRangeId(), ipjpo.getRightRangeId(), + null); ijoClone.setPhysicalOperator(iijpoClone); } else { return null; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java index 2707403..795ee82 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java @@ -55,7 +55,6 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceE import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType; -import org.apache.hyracks.api.dataflow.value.IRangeMap; import org.apache.hyracks.algebricks.core.algebra.operators.physical.MergeJoinPOperator; import org.apache.hyracks.dataflow.std.base.RangeId; import org.apache.hyracks.dataflow.std.join.IMergeJoinCheckerFactory; @@ -142,9 +141,10 @@ public class JoinUtils { private static void setSortMergeIntervalJoinOp(AbstractBinaryJoinOperator op, FunctionIdentifier fi, List sideLeft, List sideRight, IntervalJoinExpressionAnnotation ijea, IOptimizationContext context) { - IMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi); + RangeId leftRangeId = context.newRangeId(); + IMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi, leftRangeId); op.setPhysicalOperator(new MergeJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST, sideLeft, - sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), mjcf, context.newRangeId(), + sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), mjcf, leftRangeId, context.newRangeId(), ijea.getRangeMap())); } @@ -161,20 +161,22 @@ public class JoinUtils { int tuplesPerFrame = ijea.getTuplesPerFrame() > 0 ? ijea.getTuplesPerFrame() : context.getPhysicalOptimizationConfig().getMaxRecordsPerFrame(); - IIntervalMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi); + RangeId leftRangeId = context.newRangeId(); + IIntervalMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi, leftRangeId); op.setPhysicalOperator(new IntervalPartitionJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST, sideLeft, sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), leftCount, - rightCount, leftMaxDuration, rightMaxDuration, tuplesPerFrame, mjcf, context.newRangeId(), - context.newRangeId(), ijea.getRangeMap())); + rightCount, leftMaxDuration, rightMaxDuration, tuplesPerFrame, mjcf, leftRangeId, context.newRangeId(), + ijea.getRangeMap())); } private static void setIntervalIndexJoinOp(AbstractBinaryJoinOperator op, FunctionIdentifier fi, List sideLeft, List sideRight, IntervalJoinExpressionAnnotation ijea, IOptimizationContext context) { - IIntervalMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi); + RangeId leftRangeId = context.newRangeId(); + IIntervalMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi, leftRangeId); op.setPhysicalOperator(new IntervalIndexJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST, - sideLeft, sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), mjcf, - context.newRangeId(), context.newRangeId(), ijea.getRangeMap())); + sideLeft, sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), mjcf, leftRangeId, + context.newRangeId(), ijea.getRangeMap())); } private static int getMaxDuration(List lv, IOptimizationContext context) { @@ -230,8 +232,9 @@ public class JoinUtils { } } - private static IIntervalMergeJoinCheckerFactory getIntervalMergeJoinCheckerFactory(FunctionIdentifier fi) { - IIntervalMergeJoinCheckerFactory mjcf = new OverlappingIntervalMergeJoinCheckerFactory(); + private static IIntervalMergeJoinCheckerFactory getIntervalMergeJoinCheckerFactory(FunctionIdentifier fi, + RangeId rangeId) { + IIntervalMergeJoinCheckerFactory mjcf = new OverlappingIntervalMergeJoinCheckerFactory(rangeId); if (fi.equals(AsterixBuiltinFunctions.INTERVAL_OVERLAPPED_BY)) { mjcf = new OverlappedByIntervalMergeJoinCheckerFactory(); } else if (fi.equals(AsterixBuiltinFunctions.INTERVAL_OVERLAPS)) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AfterIntervalMergeJoinCheckerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AfterIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AfterIntervalMergeJoinCheckerFactory.java index 572241c..09b3020 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AfterIntervalMergeJoinCheckerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AfterIntervalMergeJoinCheckerFactory.java @@ -18,14 +18,14 @@ */ package org.apache.asterix.runtime.operators.joins; -import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; public class AfterIntervalMergeJoinCheckerFactory extends AbstractIntervalMergeJoinCheckerFactory { private static final long serialVersionUID = 1L; @Override - public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) { + public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) { return new AfterIntervalMergeJoinChecker(keys0, keys1); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/BeforeIntervalMergeJoinCheckerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/BeforeIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/BeforeIntervalMergeJoinCheckerFactory.java index ff5acf2..3e913d2 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/BeforeIntervalMergeJoinCheckerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/BeforeIntervalMergeJoinCheckerFactory.java @@ -18,14 +18,14 @@ */ package org.apache.asterix.runtime.operators.joins; -import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; public class BeforeIntervalMergeJoinCheckerFactory extends AbstractIntervalMergeJoinCheckerFactory { private static final long serialVersionUID = 1L; @Override - public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) { + public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) { return new BeforeIntervalMergeJoinChecker(keys0, keys1); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoveredByIntervalMergeJoinCheckerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoveredByIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoveredByIntervalMergeJoinCheckerFactory.java index 64b0c2a..a513cbc 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoveredByIntervalMergeJoinCheckerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoveredByIntervalMergeJoinCheckerFactory.java @@ -18,13 +18,13 @@ */ package org.apache.asterix.runtime.operators.joins; -import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.api.context.IHyracksTaskContext; public class CoveredByIntervalMergeJoinCheckerFactory extends AbstractIntervalInverseMergeJoinCheckerFactory { private static final long serialVersionUID = 1L; @Override - public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) { + public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) { return new CoveredByIntervalMergeJoinChecker(keys0, keys1); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoversIntervalMergeJoinCheckerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoversIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoversIntervalMergeJoinCheckerFactory.java index dc50451..cc9b37d 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoversIntervalMergeJoinCheckerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoversIntervalMergeJoinCheckerFactory.java @@ -18,13 +18,13 @@ */ package org.apache.asterix.runtime.operators.joins; -import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.api.context.IHyracksTaskContext; public class CoversIntervalMergeJoinCheckerFactory extends AbstractIntervalMergeJoinCheckerFactory { private static final long serialVersionUID = 1L; @Override - public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) { + public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) { return new CoversIntervalMergeJoinChecker(keys0, keys1); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndedByIntervalMergeJoinCheckerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndedByIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndedByIntervalMergeJoinCheckerFactory.java index 68e2922..c3a681c 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndedByIntervalMergeJoinCheckerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndedByIntervalMergeJoinCheckerFactory.java @@ -18,14 +18,14 @@ */ package org.apache.asterix.runtime.operators.joins; -import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; public class EndedByIntervalMergeJoinCheckerFactory extends AbstractIntervalInverseMergeJoinCheckerFactory { private static final long serialVersionUID = 1L; @Override - public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) { + public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) { return new EndedByIntervalMergeJoinChecker(keys0, keys1); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndsIntervalMergeJoinCheckerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndsIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndsIntervalMergeJoinCheckerFactory.java index e5b7be0..295bd04 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndsIntervalMergeJoinCheckerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndsIntervalMergeJoinCheckerFactory.java @@ -18,14 +18,14 @@ */ package org.apache.asterix.runtime.operators.joins; -import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; public class EndsIntervalMergeJoinCheckerFactory extends AbstractIntervalMergeJoinCheckerFactory { private static final long serialVersionUID = 1L; @Override - public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) { + public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) { return new EndsIntervalMergeJoinChecker(keys0, keys1); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IIntervalMergeJoinCheckerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IIntervalMergeJoinCheckerFactory.java index e4ceeb1..f2e3d80 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IIntervalMergeJoinCheckerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IIntervalMergeJoinCheckerFactory.java @@ -20,14 +20,14 @@ package org.apache.asterix.runtime.operators.joins; import java.io.Serializable; -import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.std.join.IMergeJoinCheckerFactory; public interface IIntervalMergeJoinCheckerFactory extends IMergeJoinCheckerFactory, Serializable { @Override - public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) + public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) throws HyracksDataException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MeetsIntervalMergeJoinCheckerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MeetsIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MeetsIntervalMergeJoinCheckerFactory.java index 038f9ef..c970fd2 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MeetsIntervalMergeJoinCheckerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MeetsIntervalMergeJoinCheckerFactory.java @@ -18,14 +18,14 @@ */ package org.apache.asterix.runtime.operators.joins; -import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; public class MeetsIntervalMergeJoinCheckerFactory extends AbstractIntervalMergeJoinCheckerFactory { private static final long serialVersionUID = 1L; @Override - public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) { + public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) { return new MeetsIntervalMergeJoinChecker(keys0, keys1); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MetByIntervalMergeJoinCheckerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MetByIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MetByIntervalMergeJoinCheckerFactory.java index 6c3fe32..fad2d88 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MetByIntervalMergeJoinCheckerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MetByIntervalMergeJoinCheckerFactory.java @@ -18,14 +18,14 @@ */ package org.apache.asterix.runtime.operators.joins; -import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; public class MetByIntervalMergeJoinCheckerFactory extends AbstractIntervalInverseMergeJoinCheckerFactory { private static final long serialVersionUID = 1L; @Override - public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) { + public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) { return new MetByIntervalMergeJoinChecker(keys0, keys1); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappedByIntervalMergeJoinCheckerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappedByIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappedByIntervalMergeJoinCheckerFactory.java index 8031181..c47381a 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappedByIntervalMergeJoinCheckerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappedByIntervalMergeJoinCheckerFactory.java @@ -18,13 +18,13 @@ */ package org.apache.asterix.runtime.operators.joins; -import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.api.context.IHyracksTaskContext; public class OverlappedByIntervalMergeJoinCheckerFactory extends AbstractIntervalInverseMergeJoinCheckerFactory { private static final long serialVersionUID = 1L; @Override - public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) { + public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) { return new OverlappedByIntervalMergeJoinChecker(keys0, keys1); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java index 195a85f..a5f7770 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java @@ -19,18 +19,29 @@ package org.apache.asterix.runtime.operators.joins; import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IRangeMap; import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.primitive.LongPointable; +import org.apache.hyracks.dataflow.std.base.RangeId; +import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState; public class OverlappingIntervalMergeJoinCheckerFactory extends AbstractIntervalMergeJoinCheckerFactory { private static final long serialVersionUID = 1L; + private final RangeId rangeId; + + public OverlappingIntervalMergeJoinCheckerFactory(RangeId rangeId) { + this.rangeId = rangeId; + } @Override - public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) - throws HyracksDataException { + public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, + IHyracksTaskContext ctx) throws HyracksDataException { int fieldIndex = 0; + RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx + .getStateObject(new RangeId(rangeId.getId(), ctx)); + IRangeMap rangeMap = rangeState.getRangeMap(); if (ATypeTag.INT64.serialize() != rangeMap.getTag(0, 0)) { throw new HyracksDataException("Invalid range map type for interval merge join checker."); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlapsIntervalMergeJoinCheckerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlapsIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlapsIntervalMergeJoinCheckerFactory.java index e3ecf2e..0cf3ac1 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlapsIntervalMergeJoinCheckerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlapsIntervalMergeJoinCheckerFactory.java @@ -18,13 +18,13 @@ */ package org.apache.asterix.runtime.operators.joins; -import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.api.context.IHyracksTaskContext; public class OverlapsIntervalMergeJoinCheckerFactory extends AbstractIntervalMergeJoinCheckerFactory { private static final long serialVersionUID = 1L; @Override - public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) { + public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) { return new OverlapsIntervalMergeJoinChecker(keys0, keys1); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartedByIntervalMergeJoinCheckerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartedByIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartedByIntervalMergeJoinCheckerFactory.java index 431aa8e..0938fe2 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartedByIntervalMergeJoinCheckerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartedByIntervalMergeJoinCheckerFactory.java @@ -18,14 +18,14 @@ */ package org.apache.asterix.runtime.operators.joins; -import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; public class StartedByIntervalMergeJoinCheckerFactory extends AbstractIntervalInverseMergeJoinCheckerFactory { private static final long serialVersionUID = 1L; @Override - public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) { + public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) { return new StartedByIntervalMergeJoinChecker(keys0, keys1); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartsIntervalMergeJoinCheckerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartsIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartsIntervalMergeJoinCheckerFactory.java index a05615c..924b442 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartsIntervalMergeJoinCheckerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartsIntervalMergeJoinCheckerFactory.java @@ -18,14 +18,14 @@ */ package org.apache.asterix.runtime.operators.joins; -import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; public class StartsIntervalMergeJoinCheckerFactory extends AbstractIntervalMergeJoinCheckerFactory { private static final long serialVersionUID = 1L; @Override - public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) { + public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) { return new StartsIntervalMergeJoinChecker(keys0, keys1); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java index 9ca536b..6f04cad 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java @@ -82,7 +82,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { super(ctx, partition, status, locks, leftRd, rightRd); this.point = imjcf.isOrderAsc() ? EndPointIndexItem.START_POINT : EndPointIndexItem.END_POINT; - this.imjc = imjcf.createMergeJoinChecker(leftKeys, rightKeys, partition, null); + this.imjc = imjcf.createMergeJoinChecker(leftKeys, rightKeys, partition, ctx); this.leftKey = leftKeys[0]; this.rightKey = rightKeys[0]; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java index 8c4c43d..4e1850c 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java @@ -160,18 +160,20 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes } } - RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(rangeId); - long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(), partition); + RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx + .getStateObject(new RangeId(rangeId.getId(), ctx)); + long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(), + partition); long partitionEnd = IntervalPartitionUtil.getEndOfPartition(rangeState.getRangeMap(), partition); - ITuplePartitionComputer buildHpc = new IntervalPartitionComputerFactory(buildKey, k, partitionStart, - partitionEnd).createPartitioner(); - ITuplePartitionComputer probeHpc = new IntervalPartitionComputerFactory(probeKey, k, partitionStart, - partitionEnd).createPartitioner(); + ITuplePartitionComputer buildHpc = new IntervalPartitionComputerFactory(buildKey, state.k, + partitionStart, partitionEnd).createPartitioner(); + ITuplePartitionComputer probeHpc = new IntervalPartitionComputerFactory(probeKey, state.k, + partitionStart, partitionEnd).createPartitioner(); state.partition = partition; state.intervalPartitions = IntervalPartitionUtil.getMaxPartitions(state.k); state.memoryForJoin = memsize; - IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(buildKeys, probeKeys, partition, rangeState.getRangeMap()); + IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(buildKeys, probeKeys, partition, ctx); state.ipj = new IntervalPartitionJoiner(ctx, state.memoryForJoin, state.k, state.intervalPartitions, BUILD_REL, PROBE_REL, imjc, buildRd, probeRd, buildHpc, probeHpc); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java index c24f3c8..7fa7fdf 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java @@ -93,10 +93,18 @@ public class MergeJoinPOperator extends AbstractJoinPOperator { return mjcf; } - public RangeId getRangeId() { + public RangeId getLeftRangeId() { return leftRangeId; } + public RangeId getRightRangeId() { + return rightRangeId; + } + + public IRangeMap getRangeMapHint() { + return rangeMapHint; + } + @Override public PhysicalOperatorTag getOperatorTag() { return PhysicalOperatorTag.MERGE_JOIN; @@ -113,8 +121,8 @@ public class MergeJoinPOperator extends AbstractJoinPOperator { for (LogicalVariable v : keysLeftBranch) { order.add(new OrderColumn(v, mjcf.isOrderAsc() ? OrderKind.ASC : OrderKind.DESC)); } - IPartitioningProperty pp = new OrderedPartitionedProperty(order, null, leftRangeId, RangePartitioningType.PROJECT, - rangeMapHint); + IPartitioningProperty pp = new OrderedPartitionedProperty(order, null, leftRangeId, + RangePartitioningType.PROJECT, rangeMapHint); List propsLocal = new ArrayList<>(); propsLocal.add(new LocalOrderProperty(order)); deliveredProperties = new StructuralPropertiesVector(pp, propsLocal); @@ -176,4 +184,5 @@ public class MergeJoinPOperator extends AbstractJoinPOperator { ILogicalOperator src2 = op.getInputs().get(1).getValue(); builder.contributeGraphEdge(src2, 0, op, 1); } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java index 040e663..92d4098 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java @@ -35,6 +35,7 @@ public class OrderedPartitionedProperty implements IPartitioningProperty { private INodeDomain domain; private RangeId rangeId; private RangePartitioningType rangeType; + private IRangeMap rangeMapHint; public OrderedPartitionedProperty(List orderColumns, INodeDomain domain, RangeId rangeId, RangePartitioningType rangeType, IRangeMap rangeMapHint) { @@ -42,6 +43,7 @@ public class OrderedPartitionedProperty implements IPartitioningProperty { this.orderColumns = orderColumns; this.rangeId = rangeId; this.rangeType = rangeType; + this.rangeMapHint = rangeMapHint; } public OrderedPartitionedProperty(List orderColumns, INodeDomain domain, RangeId rangeId) { @@ -93,6 +95,10 @@ public class OrderedPartitionedProperty implements IPartitioningProperty { return rangeId; } + public IRangeMap getRangeMapHint() { + return rangeMapHint; + } + @Override public INodeDomain getNodeDomain() { return domain; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java index 353a782..fd340d2 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java @@ -45,6 +45,9 @@ public class OperatorPropertiesUtil { private static final String MOVABLE = "isMovable"; + private OperatorPropertiesUtil() { + } + public static boolean disjoint(Collection c1, Collection c2) { for (T m : c1) { if (c2.contains(m)) { @@ -58,7 +61,7 @@ public class OperatorPropertiesUtil { private static void getFreeVariablesInOp(ILogicalOperator op, Set freeVars) throws AlgebricksException { VariableUtilities.getUsedVariables(op, freeVars); - HashSet produced = new HashSet(); + HashSet produced = new HashSet<>(); VariableUtilities.getProducedVariables(op, produced); for (LogicalVariable v : produced) { freeVars.remove(v); @@ -75,13 +78,13 @@ public class OperatorPropertiesUtil { */ public static void getFreeVariablesInSelfOrDesc(AbstractLogicalOperator op, Set freeVars) throws AlgebricksException { - HashSet produced = new HashSet(); + HashSet produced = new HashSet<>(); VariableUtilities.getProducedVariables(op, produced); for (LogicalVariable v : produced) { freeVars.remove(v); } - HashSet used = new HashSet(); + HashSet used = new HashSet<>(); VariableUtilities.getUsedVariables(op, used); for (LogicalVariable v : used) { freeVars.add(v); @@ -108,7 +111,7 @@ public class OperatorPropertiesUtil { */ public static void getFreeVariablesInPath(ILogicalOperator op, ILogicalOperator dest, Set freeVars) throws AlgebricksException { - Set producedVars = new ListSet(); + Set producedVars = new ListSet<>(); VariableUtilities.getLiveVariables(op, freeVars); collectUsedAndProducedVariablesInPath(op, dest, freeVars, producedVars); freeVars.removeAll(producedVars); @@ -163,13 +166,13 @@ public class OperatorPropertiesUtil { } public static boolean hasFreeVariablesInSelfOrDesc(AbstractLogicalOperator op) throws AlgebricksException { - HashSet free = new HashSet(); + HashSet free = new HashSet<>(); getFreeVariablesInSelfOrDesc(op, free); return !free.isEmpty(); } public static boolean hasFreeVariables(ILogicalOperator op) throws AlgebricksException { - HashSet free = new HashSet(); + HashSet free = new HashSet<>(); getFreeVariablesInOp(op, free); return !free.isEmpty(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java index 4ec5e27..f3adffd 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java @@ -586,9 +586,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { List reqdLocals = required.getLocalProperties(); // Add RangeForwardOperator. - IRangeMap rangeMap = (IRangeMap) op.getAnnotations() - .get(OperatorAnnotations.USE_RANGE_CONNECTOR); - addRangeForwardOperator(op.getInputs().get(i), opp.getRangeId(), rangeMap, context); + addRangeForwardOperator(op.getInputs().get(i), opp.getRangeId(), opp.getRangeMapHint(), context); boolean propWasSet = false; pop = null; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java index befaad9..774dd2a 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java @@ -20,12 +20,25 @@ package org.apache.hyracks.dataflow.std.base; import java.io.Serializable; +import org.apache.hyracks.api.context.IHyracksTaskContext; + /** * Represents a range id in a logical plan. */ public final class RangeId implements Serializable { private static final long serialVersionUID = 1L; private final int id; + private int partition = -1; + + public RangeId(int id, int partition) { + this.id = id; + this.partition = partition; + } + + public RangeId(int id, IHyracksTaskContext ctx) { + this.id = id; + this.partition = ctx.getTaskAttemptId().getTaskId().getPartition(); + } public RangeId(int id) { this.id = id; @@ -35,9 +48,17 @@ public final class RangeId implements Serializable { return id; } + public int getPartition() { + return partition; + } + + public void setPartition(int partition) { + this.partition = partition; + } + @Override public String toString() { - return "RangeId(#" + id + ")"; + return "RangeId(#" + id + (partition >= 0 ? "," + partition : "") + ")"; } @Override @@ -45,7 +66,7 @@ public final class RangeId implements Serializable { if (!(obj instanceof RangeId)) { return false; } else { - return id == ((RangeId) obj).getId(); + return id == ((RangeId) obj).getId() && partition == ((RangeId) obj).getPartition(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java index 2740a60..c08035a 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java @@ -50,7 +50,7 @@ public class PartitionRangeDataWriter extends AbstractPartitionDataWriter { @Override public void open() throws HyracksDataException { super.open(); - RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(rangeId); + RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(new RangeId(rangeId.getId(), ctx)); tpc = trpcf.createPartitioner(rangeState.getRangeMap()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java index 850bf56..d7ac550 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java @@ -20,13 +20,13 @@ package org.apache.hyracks.dataflow.std.join; import java.io.Serializable; -import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; import org.apache.hyracks.api.exceptions.HyracksDataException; public interface IMergeJoinCheckerFactory extends Serializable { - IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) throws HyracksDataException; + IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) throws HyracksDataException; RangePartitioningType getLeftPartitioningType(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java index 6f0b33b..5624bb5 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java @@ -203,7 +203,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor { locks.setPartitions(nPartitions); RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); final IMergeJoinChecker mjc = mergeJoinCheckerFactory.createMergeJoinChecker(leftKeys, rightKeys, - partition, null); + partition, ctx); return new RightDataOperator(ctx, partition, inRecordDesc, mjc); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java index 15df580..abdadb6 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java @@ -18,9 +18,9 @@ */ package org.apache.hyracks.dataflow.std.join; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.IRangeMap; import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator; @@ -33,7 +33,7 @@ public class NaturalMergeJoinCheckerFactory implements IMergeJoinCheckerFactory } @Override - public IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) { + public IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) { final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length]; for (int i = 0; i < comparatorFactories.length; ++i) { comparators[i] = comparatorFactories[i].createBinaryComparator(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java index 067246d..04cfca3 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java @@ -90,7 +90,8 @@ public class RangeForwardOperatorDescriptor extends AbstractOperatorDescriptor { @Override public void open() throws HyracksDataException { - state = new RangeForwardTaskState(ctx.getJobletContext().getJobId(), rangeId, rangeMap); + state = new RangeForwardTaskState(ctx.getJobletContext().getJobId(), + new RangeId(rangeId.getId(), ctx), rangeMap); ctx.setStateObject(state); writer.open(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java index e032e6a..6d9d085 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java @@ -49,7 +49,6 @@ public abstract class AbstractExternalSortRunMerger { private final INormalizedKeyComputer nmkComputer; private final RecordDescriptor recordDesc; private final int framesLimit; - private final int MAX_FRAME_SIZE; private final int topK; private List inFrames; private VSizeFrame outputFrame; @@ -75,14 +74,13 @@ public abstract class AbstractExternalSortRunMerger { this.recordDesc = recordDesc; this.framesLimit = framesLimit; this.writer = writer; - this.MAX_FRAME_SIZE = FrameConstants.MAX_FRAMESIZE; this.topK = topK; } public void process() throws HyracksDataException { IFrameWriter finalWriter = null; try { - if (runs.size() <= 0) { + if (runs.isEmpty()) { finalWriter = prepareSkipMergingFinalResultWriter(writer); finalWriter.open(); if (sorter != null) { @@ -169,9 +167,10 @@ public abstract class AbstractExternalSortRunMerger { } } - private static int selectPartialRuns(int budget, List runs, + private static int selectPartialRuns(int argBudget, List runs, List partialRuns, BitSet runAvailable, int stop) { partialRuns.clear(); + int budget = argBudget; int maxFrameSizeOfGenRun = 0; int nextRunId = runAvailable.nextSetBit(0); while (budget > 0 && nextRunId >= 0 && nextRunId < stop) { @@ -192,13 +191,14 @@ public abstract class AbstractExternalSortRunMerger { if (extraFreeMem > 0 && partialRuns.size() > 1) { int extraFrames = extraFreeMem / ctx.getInitialFrameSize(); int avg = (extraFrames / partialRuns.size()) * ctx.getInitialFrameSize(); - int residue = (extraFrames % partialRuns.size()); + int residue = extraFrames % partialRuns.size(); for (int i = 0; i < residue; i++) { - partialRuns.get(i).updateSize(Math.min(MAX_FRAME_SIZE, + partialRuns.get(i).updateSize(Math.min(FrameConstants.MAX_FRAMESIZE, partialRuns.get(i).getMaxFrameSize() + avg + ctx.getInitialFrameSize())); } for (int i = residue; i < partialRuns.size() && avg > 0; i++) { - partialRuns.get(i).updateSize(Math.min(MAX_FRAME_SIZE, partialRuns.get(i).getMaxFrameSize() + avg)); + partialRuns.get(i) + .updateSize(Math.min(FrameConstants.MAX_FRAMESIZE, partialRuns.get(i).getMaxFrameSize() + avg)); } } @@ -214,17 +214,17 @@ public abstract class AbstractExternalSortRunMerger { } } - abstract protected IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) + protected abstract IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) throws HyracksDataException; - abstract protected RunFileWriter prepareIntermediateMergeRunFile() throws HyracksDataException; + protected abstract RunFileWriter prepareIntermediateMergeRunFile() throws HyracksDataException; - abstract protected IFrameWriter prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter) + protected abstract IFrameWriter prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter) throws HyracksDataException; - abstract protected IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException; + protected abstract IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException; - abstract protected int[] getSortFields(); + protected abstract int[] getSortFields(); private void merge(IFrameWriter writer, List partialRuns) throws HyracksDataException { RunMergingFrameReader merger = new RunMergingFrameReader(ctx, partialRuns, inFrames, getSortFields(),