Return-Path: X-Original-To: apmail-asterixdb-commits-archive@minotaur.apache.org Delivered-To: apmail-asterixdb-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4B52618DFE for ; Tue, 25 Aug 2015 17:08:09 +0000 (UTC) Received: (qmail 48834 invoked by uid 500); 25 Aug 2015 17:08:09 -0000 Delivered-To: apmail-asterixdb-commits-archive@asterixdb.apache.org Received: (qmail 48755 invoked by uid 500); 25 Aug 2015 17:08:09 -0000 Mailing-List: contact commits-help@asterixdb.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.incubator.apache.org Delivered-To: mailing list commits@asterixdb.incubator.apache.org Received: (qmail 48692 invoked by uid 99); 25 Aug 2015 17:08:09 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Aug 2015 17:08:09 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 8CC1D182348 for ; Tue, 25 Aug 2015 17:08:08 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.774 X-Spam-Level: * X-Spam-Status: No, score=1.774 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.006] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id CpE_z9-JB-ck for ; Tue, 25 Aug 2015 17:07:56 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 7285E506EF for ; Tue, 25 Aug 2015 17:07:55 +0000 (UTC) Received: (qmail 69524 invoked by uid 99); 25 Aug 2015 16:41:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Aug 2015 16:41:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D0083E6826; Tue, 25 Aug 2015 16:41:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: imaxon@apache.org To: commits@asterixdb.incubator.apache.org Date: Tue, 25 Aug 2015 16:41:39 -0000 Message-Id: In-Reply-To: <339f74e1fc5c4aabb18d5e46ecf6ba11@git.apache.org> References: <339f74e1fc5c4aabb18d5e46ecf6ba11@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [26/51] [partial] incubator-asterixdb-hyracks git commit: Change folder structure for Java repackage http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java ---------------------------------------------------------------------- diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java new file mode 100644 index 0000000..3abdec9 --- /dev/null +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java @@ -0,0 +1,281 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical; + +import java.util.LinkedList; +import java.util.List; +import java.util.logging.Logger; + +import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; +import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor; +import edu.uci.ics.hyracks.api.context.IHyracksTaskContext; +import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor; +import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator; +import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; +import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily; +import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory; +import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory; +import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider; +import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator; +import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory; +import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor; +import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; +import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry; +import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor; +import edu.uci.ics.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor; + +public class HybridHashJoinPOperator extends AbstractHashJoinPOperator { + + private final int memSizeInFrames; + private final int maxInputBuildSizeInFrames; + private final int aveRecordsPerFrame; + private final double fudgeFactor; + + private static final Logger LOGGER = Logger.getLogger(HybridHashJoinPOperator.class.getName()); + + public HybridHashJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, + List sideLeftOfEqualities, List sideRightOfEqualities, + int memSizeInFrames, int maxInputSizeInFrames, int aveRecordsPerFrame, double fudgeFactor) { + super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities); + this.memSizeInFrames = memSizeInFrames; + this.maxInputBuildSizeInFrames = maxInputSizeInFrames; + this.aveRecordsPerFrame = aveRecordsPerFrame; + this.fudgeFactor = fudgeFactor; + + LOGGER.fine("HybridHashJoinPOperator constructed with: JoinKind=" + kind + ", JoinPartitioningType=" + + partitioningType + ", List=" + sideLeftOfEqualities + ", List=" + + sideRightOfEqualities + ", int memSizeInFrames=" + memSizeInFrames + ", int maxInputSize0InFrames=" + + maxInputSizeInFrames + ", int aveRecordsPerFrame=" + aveRecordsPerFrame + ", double fudgeFactor=" + + fudgeFactor + "."); + } + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.HYBRID_HASH_JOIN; + } + + @Override + public boolean isMicroOperator() { + return false; + } + + public double getFudgeFactor() { + return fudgeFactor; + } + + public int getMemSizeInFrames() { + return memSizeInFrames; + } + + @Override + public String toString() { + return getOperatorTag().toString() + " " + keysLeftBranch + keysRightBranch; + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, inputSchemas[0]); + int[] keysRight = JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]); + IVariableTypeEnvironment env = context.getTypeEnvironment(op); + IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper.variablesToBinaryHashFunctionFactories( + keysLeftBranch, env, context); + IBinaryHashFunctionFamily[] hashFunFamilies = JobGenHelper.variablesToBinaryHashFunctionFamilies( + keysLeftBranch, env, context); + IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length]; + int i = 0; + IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider(); + for (LogicalVariable v : keysLeftBranch) { + Object t = env.getVarType(v); + comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true); + } + + IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider = context + .getPredicateEvaluatorFactoryProvider(); + IPredicateEvaluatorFactory predEvaluatorFactory = (predEvaluatorFactoryProvider == null ? null + : predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight)); + + RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), + propagatedSchema, context); + IOperatorDescriptorRegistry spec = builder.getJobSpec(); + IOperatorDescriptor opDesc = null; + + boolean optimizedHashJoin = true; + for (IBinaryHashFunctionFamily family : hashFunFamilies) { + if (family == null) { + optimizedHashJoin = false; + break; + } + } + + if (!optimizedHashJoin) { + try { + switch (kind) { + case INNER: { + opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(), + maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight, + hashFunFactories, comparatorFactories, recDescriptor, predEvaluatorFactory); + break; + } + case LEFT_OUTER: { + INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()]; + for (int j = 0; j < nullWriterFactories.length; j++) { + nullWriterFactories[j] = context.getNullWriterFactory(); + } + opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(), + maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight, + hashFunFactories, comparatorFactories, recDescriptor, predEvaluatorFactory, true, + nullWriterFactories); + break; + } + default: { + throw new NotImplementedException(); + } + } + } catch (HyracksDataException e) { + throw new AlgebricksException(e); + } + } else { + try { + switch (kind) { + case INNER: { + opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(), + maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies, + comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories, + keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories, + keysRight, keysLeft), predEvaluatorFactory); + break; + } + case LEFT_OUTER: { + INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()]; + for (int j = 0; j < nullWriterFactories.length; j++) { + nullWriterFactories[j] = context.getNullWriterFactory(); + } + opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(), + maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies, + comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories, + keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories, + keysRight, keysLeft), predEvaluatorFactory, true, nullWriterFactories); + break; + } + default: { + throw new NotImplementedException(); + } + } + } catch (HyracksDataException e) { + throw new AlgebricksException(e); + } + } + contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc); + + ILogicalOperator src1 = op.getInputs().get(0).getValue(); + builder.contributeGraphEdge(src1, 0, op, 0); + ILogicalOperator src2 = op.getInputs().get(1).getValue(); + builder.contributeGraphEdge(src2, 0, op, 1); + } + + @Override + protected List deliveredLocalProperties(ILogicalOperator op, IOptimizationContext context) + throws AlgebricksException { + return new LinkedList(); + } + +} + +/** + * {@ ITuplePairComparatorFactory} implementation for optimized hybrid hash join. + */ +class JoinMultiComparatorFactory implements ITuplePairComparatorFactory { + private static final long serialVersionUID = 1L; + + private final IBinaryComparatorFactory[] binaryComparatorFactories; + private final int[] keysLeft; + private final int[] keysRight; + + public JoinMultiComparatorFactory(IBinaryComparatorFactory[] binaryComparatorFactory, int[] keysLeft, + int[] keysRight) { + this.binaryComparatorFactories = binaryComparatorFactory; + this.keysLeft = keysLeft; + this.keysRight = keysRight; + } + + @Override + public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) { + IBinaryComparator[] binaryComparators = new IBinaryComparator[binaryComparatorFactories.length]; + for (int i = 0; i < binaryComparators.length; i++) { + binaryComparators[i] = binaryComparatorFactories[i].createBinaryComparator(); + } + return new JoinMultiComparator(binaryComparators, keysLeft, keysRight); + } +} + +/** + * {@ ITuplePairComparator} implementation for optimized hybrid hash join. + * The comparator applies multiple binary comparators, one for each key pairs + */ +class JoinMultiComparator implements ITuplePairComparator { + private final IBinaryComparator[] binaryComparators; + private final int[] keysLeft; + private final int[] keysRight; + + public JoinMultiComparator(IBinaryComparator[] bComparator, int[] keysLeft, int[] keysRight) { + this.binaryComparators = bComparator; + this.keysLeft = keysLeft; + this.keysRight = keysRight; + } + + @Override + public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) + throws HyracksDataException { + int tStart0 = accessor0.getTupleStartOffset(tIndex0); + int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0; + + int tStart1 = accessor1.getTupleStartOffset(tIndex1); + int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1; + + for (int i = 0; i < binaryComparators.length; i++) { + int fStart0 = accessor0.getFieldStartOffset(tIndex0, keysLeft[i]); + int fEnd0 = accessor0.getFieldEndOffset(tIndex0, keysLeft[i]); + int fLen0 = fEnd0 - fStart0; + + int fStart1 = accessor1.getFieldStartOffset(tIndex1, keysRight[i]); + int fEnd1 = accessor1.getFieldEndOffset(tIndex1, keysRight[i]); + int fLen1 = fEnd1 - fStart1; + + int c = binaryComparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, + accessor1.getBuffer().array(), fStart1 + fStartOffset1, fLen1); + if (c != 0) { + return c; + } + } + return 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java ---------------------------------------------------------------------- diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java new file mode 100644 index 0000000..55616f9 --- /dev/null +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java @@ -0,0 +1,139 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical; + +import java.util.LinkedList; +import java.util.List; + +import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; +import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor; +import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; +import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory; +import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory; +import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider; +import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor; +import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry; +import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor; + +public class InMemoryHashJoinPOperator extends AbstractHashJoinPOperator { + + private final int tableSize; + + /** + * builds on the first operator and probes on the second. + */ + + public InMemoryHashJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, + List sideLeftOfEqualities, List sideRightOfEqualities, int tableSize) { + super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities); + this.tableSize = tableSize; + } + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.IN_MEMORY_HASH_JOIN; + } + + @Override + public String toString() { + return getOperatorTag().toString() + " " + keysLeftBranch + keysRightBranch; + } + + @Override + public boolean isMicroOperator() { + return false; + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, inputSchemas[0]); + int[] keysRight = JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]); + IVariableTypeEnvironment env = context.getTypeEnvironment(op); + IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper.variablesToBinaryHashFunctionFactories( + keysLeftBranch, env, context); + IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length]; + int i = 0; + IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider(); + for (LogicalVariable v : keysLeftBranch) { + Object t = env.getVarType(v); + comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true); + } + + IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider = context.getPredicateEvaluatorFactoryProvider(); + IPredicateEvaluatorFactory predEvaluatorFactory = ( predEvaluatorFactoryProvider == null ? null : predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight)); + + RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), + propagatedSchema, context); + IOperatorDescriptorRegistry spec = builder.getJobSpec(); + IOperatorDescriptor opDesc = null; + + switch (kind) { + case INNER: { + opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, hashFunFactories, + comparatorFactories, recDescriptor, tableSize, predEvaluatorFactory); + break; + } + case LEFT_OUTER: { + INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()]; + for (int j = 0; j < nullWriterFactories.length; j++) { + nullWriterFactories[j] = context.getNullWriterFactory(); + } + opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, hashFunFactories, + comparatorFactories, predEvaluatorFactory, recDescriptor, true, nullWriterFactories, tableSize); + break; + } + default: { + throw new NotImplementedException(); + } + } + contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc); + + ILogicalOperator src1 = op.getInputs().get(0).getValue(); + builder.contributeGraphEdge(src1, 0, op, 0); + ILogicalOperator src2 = op.getInputs().get(1).getValue(); + builder.contributeGraphEdge(src2, 0, op, 1); + } + + @Override + protected List deliveredLocalProperties(ILogicalOperator op, IOptimizationContext context) { + AbstractLogicalOperator op0 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); + IPhysicalPropertiesVector pv0 = op0.getPhysicalOperator().getDeliveredProperties(); + List lp0 = pv0.getLocalProperties(); + if (lp0 != null) { + // maintains the local properties on the probe side + return new LinkedList(lp0); + } + return new LinkedList(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryStableSortPOperator.java ---------------------------------------------------------------------- diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryStableSortPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryStableSortPOperator.java new file mode 100644 index 0000000..4a7d875 --- /dev/null +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryStableSortPOperator.java @@ -0,0 +1,82 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical; + +import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; +import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider; +import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory; +import edu.uci.ics.hyracks.algebricks.runtime.operators.sort.InMemorySortRuntimeFactory; +import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; +import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor; + +public class InMemoryStableSortPOperator extends AbstractStableSortPOperator { + + public InMemoryStableSortPOperator() { + } + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.IN_MEMORY_STABLE_SORT; + } + + @Override + public boolean isMicroOperator() { + return true; + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context); + int n = sortColumns.length; + int[] sortFields = new int[n]; + IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n]; + int i = 0; + INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider(); + INormalizedKeyComputerFactory nkcf = null; + IVariableTypeEnvironment env = context.getTypeEnvironment(op); + for (OrderColumn oc : sortColumns) { + LogicalVariable var = oc.getColumn(); + sortFields[i] = opSchema.findVariable(var); + Object type = env.getVarType(var); + OrderKind order = oc.getOrder(); + if (i == 0 && nkcfProvider != null && type != null) { + nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC); + } + + IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider(); + comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC); + i++; + } + + IPushRuntimeFactory runtime = new InMemorySortRuntimeFactory(sortFields, nkcf, comps, null); + builder.contributeMicroOperator(op, runtime, recDescriptor); + ILogicalOperator src = op.getInputs().get(0).getValue(); + builder.contributeGraphEdge(src, 0, op, 0); + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java ---------------------------------------------------------------------- diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java new file mode 100644 index 0000000..db00b9e --- /dev/null +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java @@ -0,0 +1,140 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.mutable.Mutable; + +import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.common.utils.Pair; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex; +import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor; +import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor; +import edu.uci.ics.hyracks.api.job.JobSpecification; + +public class IndexBulkloadPOperator extends AbstractPhysicalOperator { + + private final List primaryKeys; + private final List secondaryKeys; + private final List additionalFilteringKeys; + private final Mutable filterExpr; + private final IDataSourceIndex dataSourceIndex; + + public IndexBulkloadPOperator(List primaryKeys, List secondaryKeys, + List additionalFilteringKeys, Mutable filterExpr, + IDataSourceIndex dataSourceIndex) { + this.primaryKeys = primaryKeys; + this.secondaryKeys = secondaryKeys; + this.additionalFilteringKeys = additionalFilteringKeys; + this.filterExpr = filterExpr; + this.dataSourceIndex = dataSourceIndex; + } + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.INDEX_BULKLOAD; + } + + @Override + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, + IPhysicalPropertiesVector reqdByParent) { + List scanVariables = new ArrayList<>(); + scanVariables.addAll(primaryKeys); + scanVariables.add(new LogicalVariable(-1)); + IPhysicalPropertiesVector physicalProps = dataSourceIndex.getDataSource().getPropertiesProvider() + .computePropertiesVector(scanVariables); + List localProperties = new ArrayList<>(); + List orderColumns = new ArrayList(); + // Data needs to be sorted based on the [token, number of token, PK] + // OR [token, PK] if the index is not partitioned + for (LogicalVariable skVar : secondaryKeys) { + orderColumns.add(new OrderColumn(skVar, OrderKind.ASC)); + } + for (LogicalVariable pkVar : primaryKeys) { + orderColumns.add(new OrderColumn(pkVar, OrderKind.ASC)); + } + localProperties.add(new LocalOrderProperty(orderColumns)); + StructuralPropertiesVector spv = new StructuralPropertiesVector(physicalProps.getPartitioningProperty(), + localProperties); + return new PhysicalRequirements(new IPhysicalPropertiesVector[] { spv }, + IPartitioningRequirementsCoordinator.NO_COORDINATION); + } + + @Override + public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) + throws AlgebricksException { + AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); + deliveredProperties = op2.getDeliveredPhysicalProperties().clone(); + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + IndexInsertDeleteOperator indexInsertDeleteOp = (IndexInsertDeleteOperator) op; + assert indexInsertDeleteOp.getOperation() == Kind.INSERT; + assert indexInsertDeleteOp.isBulkload(); + + IMetadataProvider mp = context.getMetadataProvider(); + IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op); + JobSpecification spec = builder.getJobSpec(); + RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor( + context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context); + Pair runtimeAndConstraints = mp.getIndexInsertRuntime( + dataSourceIndex, propagatedSchema, inputSchemas, typeEnv, primaryKeys, secondaryKeys, + additionalFilteringKeys, null, inputDesc, context, spec, true); + builder.contributeHyracksOperator(indexInsertDeleteOp, runtimeAndConstraints.first); + builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second); + ILogicalOperator src = indexInsertDeleteOp.getInputs().get(0).getValue(); + builder.contributeGraphEdge(src, 0, indexInsertDeleteOp, 0); + } + + @Override + public boolean isMicroOperator() { + return false; + } + + @Override + public boolean expensiveThanMaterialization() { + return false; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java ---------------------------------------------------------------------- diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java new file mode 100644 index 0000000..4bc4c06 --- /dev/null +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java @@ -0,0 +1,131 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.mutable.Mutable; + +import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.common.utils.Pair; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex; +import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor; +import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor; +import edu.uci.ics.hyracks.api.job.JobSpecification; + +public class IndexInsertDeletePOperator extends AbstractPhysicalOperator { + + private final List primaryKeys; + private final List secondaryKeys; + private final ILogicalExpression filterExpr; + private final IDataSourceIndex dataSourceIndex; + private final List additionalFilteringKeys; + + public IndexInsertDeletePOperator(List primaryKeys, List secondaryKeys, + List additionalFilteringKeys, Mutable filterExpr, + IDataSourceIndex dataSourceIndex) { + this.primaryKeys = primaryKeys; + this.secondaryKeys = secondaryKeys; + if (filterExpr != null) { + this.filterExpr = filterExpr.getValue(); + } else { + this.filterExpr = null; + } + this.dataSourceIndex = dataSourceIndex; + this.additionalFilteringKeys = additionalFilteringKeys; + } + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.INDEX_INSERT_DELETE; + } + + @Override + public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { + AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); + deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone(); + } + + @Override + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, + IPhysicalPropertiesVector reqdByParent) { + List scanVariables = new ArrayList(); + scanVariables.addAll(primaryKeys); + scanVariables.add(new LogicalVariable(-1)); + IPhysicalPropertiesVector r = dataSourceIndex.getDataSource().getPropertiesProvider() + .computePropertiesVector(scanVariables); + r.getLocalProperties().clear(); + IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1]; + requirements[0] = r; + return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + IndexInsertDeleteOperator insertDeleteOp = (IndexInsertDeleteOperator) op; + IMetadataProvider mp = context.getMetadataProvider(); + + JobSpecification spec = builder.getJobSpec(); + RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor( + context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context); + + Pair runtimeAndConstraints = null; + IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(insertDeleteOp); + if (insertDeleteOp.getOperation() == Kind.INSERT) { + runtimeAndConstraints = mp.getIndexInsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv, + primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec, false); + } else { + runtimeAndConstraints = mp.getIndexDeleteRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv, + primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec); + } + builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first); + builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second); + ILogicalOperator src = insertDeleteOp.getInputs().get(0).getValue(); + builder.contributeGraphEdge(src, 0, insertDeleteOp, 0); + } + + @Override + public boolean isMicroOperator() { + return false; + } + + @Override + public boolean expensiveThanMaterialization() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java ---------------------------------------------------------------------- diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java new file mode 100644 index 0000000..993e8d7 --- /dev/null +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java @@ -0,0 +1,121 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical; + +import java.util.ArrayList; +import java.util.List; + +import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.common.utils.Pair; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource; +import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor; +import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor; +import edu.uci.ics.hyracks.api.job.JobSpecification; + +@SuppressWarnings("rawtypes") +public class InsertDeletePOperator extends AbstractPhysicalOperator { + + private LogicalVariable payload; + private List keys; + private IDataSource dataSource; + private final List additionalFilteringKeys; + + public InsertDeletePOperator(LogicalVariable payload, List keys, + List additionalFilteringKeys, IDataSource dataSource) { + this.payload = payload; + this.keys = keys; + this.dataSource = dataSource; + this.additionalFilteringKeys = additionalFilteringKeys; + } + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.INSERT_DELETE; + } + + @Override + public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { + AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); + deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone(); + } + + @Override + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, + IPhysicalPropertiesVector reqdByParent) { + List scanVariables = new ArrayList(); + scanVariables.addAll(keys); + scanVariables.add(new LogicalVariable(-1)); + IPhysicalPropertiesVector r = dataSource.getPropertiesProvider().computePropertiesVector(scanVariables); + r.getLocalProperties().clear(); + IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1]; + requirements[0] = r; + return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION); + } + + @SuppressWarnings("unchecked") + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + InsertDeleteOperator insertDeleteOp = (InsertDeleteOperator) op; + IMetadataProvider mp = context.getMetadataProvider(); + IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op); + JobSpecification spec = builder.getJobSpec(); + RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor( + context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context); + + Pair runtimeAndConstraints = null; + if (insertDeleteOp.getOperation() == Kind.INSERT) { + runtimeAndConstraints = mp.getInsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload, + additionalFilteringKeys, inputDesc, context, spec, false); + } else { + runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, typeEnv, keys, payload, + additionalFilteringKeys, inputDesc, context, spec); + } + + builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first); + builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second); + ILogicalOperator src = insertDeleteOp.getInputs().get(0).getValue(); + builder.contributeGraphEdge(src, 0, insertDeleteOp, 0); + } + + @Override + public boolean isMicroOperator() { + return false; + } + + @Override + public boolean expensiveThanMaterialization() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java ---------------------------------------------------------------------- diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java new file mode 100644 index 0000000..27f4b25 --- /dev/null +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java @@ -0,0 +1,94 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical; + +import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.common.utils.Pair; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor; +import edu.uci.ics.hyracks.dataflow.std.misc.MaterializingOperatorDescriptor; + +public class MaterializePOperator extends AbstractPhysicalOperator { + + private final boolean isSingleActivity; + + public MaterializePOperator(boolean isSingleActivity) { + this.isSingleActivity = isSingleActivity; + } + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.MATERIALIZE; + } + + @Override + public boolean isMicroOperator() { + return false; + } + + @Override + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, + IPhysicalPropertiesVector reqdByParent) { + return emptyUnaryRequirements(); + } + + @Override + public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) + throws AlgebricksException { + AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); + deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone(); + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), + propagatedSchema, context); + MaterializingOperatorDescriptor materializationOpDesc = new MaterializingOperatorDescriptor( + builder.getJobSpec(), recDescriptor, isSingleActivity); + contributeOpDesc(builder, (AbstractLogicalOperator) op, materializationOpDesc); + ILogicalOperator src = op.getInputs().get(0).getValue(); + builder.contributeGraphEdge(src, 0, op, 0); + } + + @Override + public Pair getInputOutputDependencyLabels(ILogicalOperator op) { + int[] inputDependencyLabels = new int[] { 0 }; + int[] outputDependencyLabels; + if (isSingleActivity) { + outputDependencyLabels = new int[] { 0 }; + } else { + outputDependencyLabels = new int[] { 1 }; + } + return new Pair(inputDependencyLabels, outputDependencyLabels); + } + + @Override + public boolean expensiveThanMaterialization() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java ---------------------------------------------------------------------- diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java new file mode 100644 index 0000000..546a77b --- /dev/null +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java @@ -0,0 +1,94 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical; + +import java.util.List; + +import org.apache.commons.lang3.mutable.Mutable; + +import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.common.utils.Pair; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline; +import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.NestedPlansAccumulatingAggregatorFactory; +import edu.uci.ics.hyracks.algebricks.runtime.operators.group.MicroPreClusteredGroupRuntimeFactory; +import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor; +import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory; + +public class MicroPreclusteredGroupByPOperator extends AbstractPreclusteredGroupByPOperator { + + public MicroPreclusteredGroupByPOperator(List columnList) { + super(columnList); + } + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.MICRO_PRE_CLUSTERED_GROUP_BY; + } + + @Override + public boolean isMicroOperator() { + return true; + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + int keys[] = JobGenHelper.variablesToFieldIndexes(columnList, inputSchemas[0]); + GroupByOperator gby = (GroupByOperator) op; + int numFds = gby.getDecorList().size(); + int fdColumns[] = new int[numFds]; + IVariableTypeEnvironment env = context.getTypeEnvironment(op); + int j = 0; + for (Pair> p : gby.getDecorList()) { + ILogicalExpression expr = p.second.getValue(); + if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) { + throw new AlgebricksException("pre-sorted group-by expects variable references."); + } + VariableReferenceExpression v = (VariableReferenceExpression) expr; + LogicalVariable decor = v.getVariableReference(); + fdColumns[j++] = inputSchemas[0].findVariable(decor); + } + // compile subplans and set the gby op. schema accordingly + AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], gby, opSchema, context); + IAggregatorDescriptorFactory aggregatorFactory = new NestedPlansAccumulatingAggregatorFactory(subplans, keys, + fdColumns); + + IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories( + columnList, env, context); + RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context); + RecordDescriptor inputRecordDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op.getInputs().get(0).getValue()), + inputSchemas[0], context); + MicroPreClusteredGroupRuntimeFactory runtime = new MicroPreClusteredGroupRuntimeFactory(keys, + comparatorFactories, aggregatorFactory, inputRecordDesc, recordDescriptor, null); + builder.contributeMicroOperator(gby, runtime, recordDescriptor); + ILogicalOperator src = op.getInputs().get(0).getValue(); + builder.contributeGraphEdge(src, 0, op, 0); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java ---------------------------------------------------------------------- diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java new file mode 100644 index 0000000..a2e697d --- /dev/null +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java @@ -0,0 +1,288 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical; + +import java.util.LinkedList; +import java.util.List; + +import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector; +import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory; +import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator; +import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor; +import edu.uci.ics.hyracks.api.context.IHyracksTaskContext; +import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor; +import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory; +import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator; +import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory; +import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor; +import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; +import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry; +import edu.uci.ics.hyracks.data.std.api.IPointable; +import edu.uci.ics.hyracks.data.std.primitive.VoidPointable; +import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference; +import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference; +import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor; + +/** + * Left input is broadcast and preserves its local properties. + * Right input can be partitioned in any way. + */ +public class NLJoinPOperator extends AbstractJoinPOperator { + + private final int memSize; + + public NLJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, int memSize) { + super(kind, partitioningType); + this.memSize = memSize; + } + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.NESTED_LOOP; + } + + @Override + public boolean isMicroOperator() { + return false; + } + + @Override + public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context) { + if (partitioningType != JoinPartitioningType.BROADCAST) { + throw new NotImplementedException(partitioningType + " nested loop joins are not implemented."); + } + + IPartitioningProperty pp; + + AbstractLogicalOperator op = (AbstractLogicalOperator) iop; + + if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) { + AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(1).getValue(); + IPhysicalPropertiesVector pv1 = op2.getPhysicalOperator().getDeliveredProperties(); + if (pv1 == null) { + pp = null; + } else { + pp = pv1.getPartitioningProperty(); + } + } else { + pp = IPartitioningProperty.UNPARTITIONED; + } + + List localProps = new LinkedList(); + this.deliveredProperties = new StructuralPropertiesVector(pp, localProps); + } + + @Override + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, + IPhysicalPropertiesVector reqdByParent) { + if (partitioningType != JoinPartitioningType.BROADCAST) { + throw new NotImplementedException(partitioningType + " nested loop joins are not implemented."); + } + + StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2]; + pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null); + pv[1] = new StructuralPropertiesVector(null, null); + return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION); + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op; + RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), + propagatedSchema, context); + IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1]; + conditionInputSchemas[0] = propagatedSchema; + IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider(); + IScalarEvaluatorFactory cond = expressionRuntimeProvider.createEvaluatorFactory(join.getCondition().getValue(), + context.getTypeEnvironment(op), conditionInputSchemas, context); + ITuplePairComparatorFactory comparatorFactory = new TuplePairEvaluatorFactory(cond, + context.getBinaryBooleanInspectorFactory()); + IOperatorDescriptorRegistry spec = builder.getJobSpec(); + IOperatorDescriptor opDesc = null; + + switch (kind) { + case INNER: { + opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, false, + null); + break; + } + case LEFT_OUTER: { + INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()]; + for (int j = 0; j < nullWriterFactories.length; j++) { + nullWriterFactories[j] = context.getNullWriterFactory(); + } + opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, true, + nullWriterFactories); + break; + } + default: { + throw new NotImplementedException(); + } + } + contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc); + + ILogicalOperator src1 = op.getInputs().get(0).getValue(); + builder.contributeGraphEdge(src1, 0, op, 0); + ILogicalOperator src2 = op.getInputs().get(1).getValue(); + builder.contributeGraphEdge(src2, 0, op, 1); + } + + public static class TuplePairEvaluatorFactory implements ITuplePairComparatorFactory { + + private static final long serialVersionUID = 1L; + private final IScalarEvaluatorFactory cond; + private final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory; + + public TuplePairEvaluatorFactory(IScalarEvaluatorFactory cond, + IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) { + this.cond = cond; + this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory; + } + + @Override + public synchronized ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) { + return new TuplePairEvaluator(ctx, cond, binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx)); + } + } + + public static class TuplePairEvaluator implements ITuplePairComparator { + private final IHyracksTaskContext ctx; + private IScalarEvaluator condEvaluator; + private final IScalarEvaluatorFactory condFactory; + private final IPointable p; + private final CompositeFrameTupleReference compositeTupleRef; + private final FrameTupleReference leftRef; + private final FrameTupleReference rightRef; + private final IBinaryBooleanInspector binaryBooleanInspector; + + public TuplePairEvaluator(IHyracksTaskContext ctx, IScalarEvaluatorFactory condFactory, + IBinaryBooleanInspector binaryBooleanInspector) { + this.ctx = ctx; + this.condFactory = condFactory; + this.binaryBooleanInspector = binaryBooleanInspector; + this.leftRef = new FrameTupleReference(); + this.p = VoidPointable.FACTORY.createPointable(); + this.rightRef = new FrameTupleReference(); + this.compositeTupleRef = new CompositeFrameTupleReference(leftRef, rightRef); + } + + @Override + public int compare(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor, + int innerIndex) throws HyracksDataException { + if (condEvaluator == null) { + try { + this.condEvaluator = condFactory.createScalarEvaluator(ctx); + } catch (AlgebricksException ae) { + throw new HyracksDataException(ae); + } + } + compositeTupleRef.reset(outerAccessor, outerIndex, innerAccessor, innerIndex); + try { + condEvaluator.evaluate(compositeTupleRef, p); + } catch (AlgebricksException ae) { + throw new HyracksDataException(ae); + } + boolean result = binaryBooleanInspector + .getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength()); + if (result) + return 0; + else + return 1; + } + } + + public static class CompositeFrameTupleReference implements IFrameTupleReference { + + private final FrameTupleReference refLeft; + private final FrameTupleReference refRight; + + public CompositeFrameTupleReference(FrameTupleReference refLeft, FrameTupleReference refRight) { + this.refLeft = refLeft; + this.refRight = refRight; + } + + public void reset(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor, + int innerIndex) { + refLeft.reset(outerAccessor, outerIndex); + refRight.reset(innerAccessor, innerIndex); + } + + @Override + public int getFieldCount() { + return refLeft.getFieldCount() + refRight.getFieldCount(); + } + + @Override + public byte[] getFieldData(int fIdx) { + int leftFieldCount = refLeft.getFieldCount(); + if (fIdx < leftFieldCount) + return refLeft.getFieldData(fIdx); + else + return refRight.getFieldData(fIdx - leftFieldCount); + } + + @Override + public int getFieldStart(int fIdx) { + int leftFieldCount = refLeft.getFieldCount(); + if (fIdx < leftFieldCount) + return refLeft.getFieldStart(fIdx); + else + return refRight.getFieldStart(fIdx - leftFieldCount); + } + + @Override + public int getFieldLength(int fIdx) { + int leftFieldCount = refLeft.getFieldCount(); + if (fIdx < leftFieldCount) + return refLeft.getFieldLength(fIdx); + else + return refRight.getFieldLength(fIdx - leftFieldCount); + } + + @Override + public IFrameTupleAccessor getFrameTupleAccessor() { + throw new NotImplementedException(); + } + + @Override + public int getTupleIndex() { + throw new NotImplementedException(); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java ---------------------------------------------------------------------- diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java new file mode 100644 index 0000000..4fdfa32 --- /dev/null +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java @@ -0,0 +1,107 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.mutable.Mutable; + +import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory; +import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor; + +public class NestedTupleSourcePOperator extends AbstractPhysicalOperator { + + public NestedTupleSourcePOperator() { + } + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.NESTED_TUPLE_SOURCE; + } + + @Override + public boolean isMicroOperator() { + return true; + } + + @Override + public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { + Mutable dataSource = ((NestedTupleSourceOperator) op).getDataSourceReference(); + AbstractLogicalOperator op2 = (AbstractLogicalOperator) dataSource.getValue().getInputs().get(0).getValue(); + IPhysicalPropertiesVector inheritedProps = op2.getDeliveredPhysicalProperties(); + AbstractLogicalOperator parent = (AbstractLogicalOperator) dataSource.getValue(); + if (parent.getOperatorTag() == LogicalOperatorTag.GROUP) { + // The following part computes the data property regarding to each particular group. + // TODO(buyingyi): we need to add the original data property as well. But currently + // there are places assuming there is only one LocalOrderProperty and one + // LocalGroupingProperty delivered by an operator. + GroupByOperator gby = (GroupByOperator) parent; + List originalLocalProperties = inheritedProps.getLocalProperties(); + List newLocalProperties = null; + if (originalLocalProperties != null) { + newLocalProperties = new ArrayList(); + for (ILocalStructuralProperty lsp : inheritedProps.getLocalProperties()) { + ILocalStructuralProperty newLsp = lsp.regardToGroup(gby.getGbyVarList()); + if (newLsp != null) { + newLocalProperties.add(newLsp); + } + } + } + deliveredProperties = new StructuralPropertiesVector(inheritedProps.getPartitioningProperty(), + newLocalProperties); + } else { + deliveredProperties = inheritedProps.clone(); + } + } + + @Override + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, + IPhysicalPropertiesVector reqdByParent) { + return null; + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + propagatedSchema.addAllVariables(outerPlanSchema); + NestedTupleSourceRuntimeFactory runtime = new NestedTupleSourceRuntimeFactory(); + RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, + context); + builder.contributeMicroOperator(op, runtime, recDesc); + } + + @Override + public boolean expensiveThanMaterialization() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java ---------------------------------------------------------------------- diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java new file mode 100644 index 0000000..456744e --- /dev/null +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java @@ -0,0 +1,61 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical; + +import edu.uci.ics.hyracks.algebricks.common.utils.Pair; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor; +import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry; +import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; + +public class OneToOneExchangePOperator extends AbstractExchangePOperator { + + public OneToOneExchangePOperator() { + } + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.ONE_TO_ONE_EXCHANGE; + } + + @Override + public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { + AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); + deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone(); + } + + @Override + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, + IPhysicalPropertiesVector reqdByParent) { + return emptyUnaryRequirements(); + } + + @Override + public Pair createConnectorDescriptor(IConnectorDescriptorRegistry spec, + ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) { + IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec); + return new Pair(conn, TargetConstraint.SAME_COUNT); + } + +}