asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ima...@apache.org
Subject [26/51] [partial] incubator-asterixdb-hyracks git commit: Change folder structure for Java repackage
Date Tue, 25 Aug 2015 16:41:39 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
new file mode 100644
index 0000000..3abdec9
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -0,0 +1,281 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
+
+public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
+
+    private final int memSizeInFrames;
+    private final int maxInputBuildSizeInFrames;
+    private final int aveRecordsPerFrame;
+    private final double fudgeFactor;
+
+    private static final Logger LOGGER = Logger.getLogger(HybridHashJoinPOperator.class.getName());
+
+    public HybridHashJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
+            List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities,
+            int memSizeInFrames, int maxInputSizeInFrames, int aveRecordsPerFrame, double fudgeFactor) {
+        super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities);
+        this.memSizeInFrames = memSizeInFrames;
+        this.maxInputBuildSizeInFrames = maxInputSizeInFrames;
+        this.aveRecordsPerFrame = aveRecordsPerFrame;
+        this.fudgeFactor = fudgeFactor;
+
+        LOGGER.fine("HybridHashJoinPOperator constructed with: JoinKind=" + kind + ", JoinPartitioningType="
+                + partitioningType + ", List<LogicalVariable>=" + sideLeftOfEqualities + ", List<LogicalVariable>="
+                + sideRightOfEqualities + ", int memSizeInFrames=" + memSizeInFrames + ", int maxInputSize0InFrames="
+                + maxInputSizeInFrames + ", int aveRecordsPerFrame=" + aveRecordsPerFrame + ", double fudgeFactor="
+                + fudgeFactor + ".");
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.HYBRID_HASH_JOIN;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    public double getFudgeFactor() {
+        return fudgeFactor;
+    }
+
+    public int getMemSizeInFrames() {
+        return memSizeInFrames;
+    }
+
+    @Override
+    public String toString() {
+        return getOperatorTag().toString() + " " + keysLeftBranch + keysRightBranch;
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, inputSchemas[0]);
+        int[] keysRight = JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]);
+        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+        IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper.variablesToBinaryHashFunctionFactories(
+                keysLeftBranch, env, context);
+        IBinaryHashFunctionFamily[] hashFunFamilies = JobGenHelper.variablesToBinaryHashFunctionFamilies(
+                keysLeftBranch, env, context);
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length];
+        int i = 0;
+        IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+        for (LogicalVariable v : keysLeftBranch) {
+            Object t = env.getVarType(v);
+            comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
+        }
+
+        IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider = context
+                .getPredicateEvaluatorFactoryProvider();
+        IPredicateEvaluatorFactory predEvaluatorFactory = (predEvaluatorFactoryProvider == null ? null
+                : predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight));
+
+        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+                propagatedSchema, context);
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+        IOperatorDescriptor opDesc = null;
+
+        boolean optimizedHashJoin = true;
+        for (IBinaryHashFunctionFamily family : hashFunFamilies) {
+            if (family == null) {
+                optimizedHashJoin = false;
+                break;
+            }
+        }
+
+        if (!optimizedHashJoin) {
+            try {
+                switch (kind) {
+                    case INNER: {
+                        opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+                                maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
+                                hashFunFactories, comparatorFactories, recDescriptor, predEvaluatorFactory);
+                        break;
+                    }
+                    case LEFT_OUTER: {
+                        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
+                        for (int j = 0; j < nullWriterFactories.length; j++) {
+                            nullWriterFactories[j] = context.getNullWriterFactory();
+                        }
+                        opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+                                maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
+                                hashFunFactories, comparatorFactories, recDescriptor, predEvaluatorFactory, true,
+                                nullWriterFactories);
+                        break;
+                    }
+                    default: {
+                        throw new NotImplementedException();
+                    }
+                }
+            } catch (HyracksDataException e) {
+                throw new AlgebricksException(e);
+            }
+        } else {
+            try {
+                switch (kind) {
+                    case INNER: {
+                        opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+                                maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
+                                comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
+                                        keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
+                                        keysRight, keysLeft), predEvaluatorFactory);
+                        break;
+                    }
+                    case LEFT_OUTER: {
+                        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
+                        for (int j = 0; j < nullWriterFactories.length; j++) {
+                            nullWriterFactories[j] = context.getNullWriterFactory();
+                        }
+                        opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+                                maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
+                                comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
+                                        keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
+                                        keysRight, keysLeft), predEvaluatorFactory, true, nullWriterFactories);
+                        break;
+                    }
+                    default: {
+                        throw new NotImplementedException();
+                    }
+                }
+            } catch (HyracksDataException e) {
+                throw new AlgebricksException(e);
+            }
+        }
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+        ILogicalOperator src1 = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src1, 0, op, 0);
+        ILogicalOperator src2 = op.getInputs().get(1).getValue();
+        builder.contributeGraphEdge(src2, 0, op, 1);
+    }
+
+    @Override
+    protected List<ILocalStructuralProperty> deliveredLocalProperties(ILogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
+        return new LinkedList<ILocalStructuralProperty>();
+    }
+
+}
+
+/**
+ * {@ ITuplePairComparatorFactory} implementation for optimized hybrid hash join.
+ */
+class JoinMultiComparatorFactory implements ITuplePairComparatorFactory {
+    private static final long serialVersionUID = 1L;
+
+    private final IBinaryComparatorFactory[] binaryComparatorFactories;
+    private final int[] keysLeft;
+    private final int[] keysRight;
+
+    public JoinMultiComparatorFactory(IBinaryComparatorFactory[] binaryComparatorFactory, int[] keysLeft,
+            int[] keysRight) {
+        this.binaryComparatorFactories = binaryComparatorFactory;
+        this.keysLeft = keysLeft;
+        this.keysRight = keysRight;
+    }
+
+    @Override
+    public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
+        IBinaryComparator[] binaryComparators = new IBinaryComparator[binaryComparatorFactories.length];
+        for (int i = 0; i < binaryComparators.length; i++) {
+            binaryComparators[i] = binaryComparatorFactories[i].createBinaryComparator();
+        }
+        return new JoinMultiComparator(binaryComparators, keysLeft, keysRight);
+    }
+}
+
+/**
+ * {@ ITuplePairComparator} implementation for optimized hybrid hash join.
+ * The comparator applies multiple binary comparators, one for each key pairs
+ */
+class JoinMultiComparator implements ITuplePairComparator {
+    private final IBinaryComparator[] binaryComparators;
+    private final int[] keysLeft;
+    private final int[] keysRight;
+
+    public JoinMultiComparator(IBinaryComparator[] bComparator, int[] keysLeft, int[] keysRight) {
+        this.binaryComparators = bComparator;
+        this.keysLeft = keysLeft;
+        this.keysRight = keysRight;
+    }
+
+    @Override
+    public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
+            throws HyracksDataException {
+        int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+        int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+        int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+        int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+        for (int i = 0; i < binaryComparators.length; i++) {
+            int fStart0 = accessor0.getFieldStartOffset(tIndex0, keysLeft[i]);
+            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, keysLeft[i]);
+            int fLen0 = fEnd0 - fStart0;
+
+            int fStart1 = accessor1.getFieldStartOffset(tIndex1, keysRight[i]);
+            int fEnd1 = accessor1.getFieldEndOffset(tIndex1, keysRight[i]);
+            int fLen1 = fEnd1 - fStart1;
+
+            int c = binaryComparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0,
+                    accessor1.getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
new file mode 100644
index 0000000..55616f9
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
+
+public class InMemoryHashJoinPOperator extends AbstractHashJoinPOperator {
+
+    private final int tableSize;
+
+    /**
+     * builds on the first operator and probes on the second.
+     */
+
+    public InMemoryHashJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
+            List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities, int tableSize) {
+        super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities);
+        this.tableSize = tableSize;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.IN_MEMORY_HASH_JOIN;
+    }
+
+    @Override
+    public String toString() {
+        return getOperatorTag().toString() + " " + keysLeftBranch + keysRightBranch;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, inputSchemas[0]);
+        int[] keysRight = JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]);
+        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+        IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper.variablesToBinaryHashFunctionFactories(
+                keysLeftBranch, env, context);
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length];
+        int i = 0;
+        IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+        for (LogicalVariable v : keysLeftBranch) {
+            Object t = env.getVarType(v);
+            comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
+        }
+        
+        IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider = context.getPredicateEvaluatorFactoryProvider();
+        IPredicateEvaluatorFactory predEvaluatorFactory = ( predEvaluatorFactoryProvider == null ? null : predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight));
+        
+        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+                propagatedSchema, context);
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+        IOperatorDescriptor opDesc = null;
+
+        switch (kind) {
+            case INNER: {
+                opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, hashFunFactories,
+                        comparatorFactories, recDescriptor, tableSize, predEvaluatorFactory);
+                break;
+            }
+            case LEFT_OUTER: {
+                INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
+                for (int j = 0; j < nullWriterFactories.length; j++) {
+                    nullWriterFactories[j] = context.getNullWriterFactory();
+                }
+                opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, hashFunFactories,
+                        comparatorFactories, predEvaluatorFactory, recDescriptor, true, nullWriterFactories, tableSize);
+                break;
+            }
+            default: {
+                throw new NotImplementedException();
+            }
+        }
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+        ILogicalOperator src1 = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src1, 0, op, 0);
+        ILogicalOperator src2 = op.getInputs().get(1).getValue();
+        builder.contributeGraphEdge(src2, 0, op, 1);
+    }
+
+    @Override
+    protected List<ILocalStructuralProperty> deliveredLocalProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op0 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        IPhysicalPropertiesVector pv0 = op0.getPhysicalOperator().getDeliveredProperties();
+        List<ILocalStructuralProperty> lp0 = pv0.getLocalProperties();
+        if (lp0 != null) {
+            // maintains the local properties on the probe side
+            return new LinkedList<ILocalStructuralProperty>(lp0);
+        }
+        return new LinkedList<ILocalStructuralProperty>();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryStableSortPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryStableSortPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryStableSortPOperator.java
new file mode 100644
index 0000000..4a7d875
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryStableSortPOperator.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.sort.InMemorySortRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class InMemoryStableSortPOperator extends AbstractStableSortPOperator {
+
+    public InMemoryStableSortPOperator() {
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.IN_MEMORY_STABLE_SORT;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+        int n = sortColumns.length;
+        int[] sortFields = new int[n];
+        IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+        int i = 0;
+        INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+        INormalizedKeyComputerFactory nkcf = null;
+        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+        for (OrderColumn oc : sortColumns) {
+            LogicalVariable var = oc.getColumn();
+            sortFields[i] = opSchema.findVariable(var);
+            Object type = env.getVarType(var);
+            OrderKind order = oc.getOrder();
+            if (i == 0 && nkcfProvider != null && type != null) {
+                nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC);
+            }
+
+            IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+            comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
+            i++;
+        }
+
+        IPushRuntimeFactory runtime = new InMemorySortRuntimeFactory(sortFields, nkcf, comps, null);
+        builder.contributeMicroOperator(op, runtime, recDescriptor);
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
new file mode 100644
index 0000000..db00b9e
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class IndexBulkloadPOperator extends AbstractPhysicalOperator {
+
+    private final List<LogicalVariable> primaryKeys;
+    private final List<LogicalVariable> secondaryKeys;
+    private final List<LogicalVariable> additionalFilteringKeys;
+    private final Mutable<ILogicalExpression> filterExpr;
+    private final IDataSourceIndex<?, ?> dataSourceIndex;
+
+    public IndexBulkloadPOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+            List<LogicalVariable> additionalFilteringKeys, Mutable<ILogicalExpression> filterExpr,
+            IDataSourceIndex<?, ?> dataSourceIndex) {
+        this.primaryKeys = primaryKeys;
+        this.secondaryKeys = secondaryKeys;
+        this.additionalFilteringKeys = additionalFilteringKeys;
+        this.filterExpr = filterExpr;
+        this.dataSourceIndex = dataSourceIndex;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.INDEX_BULKLOAD;
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        List<LogicalVariable> scanVariables = new ArrayList<>();
+        scanVariables.addAll(primaryKeys);
+        scanVariables.add(new LogicalVariable(-1));
+        IPhysicalPropertiesVector physicalProps = dataSourceIndex.getDataSource().getPropertiesProvider()
+                .computePropertiesVector(scanVariables);
+        List<ILocalStructuralProperty> localProperties = new ArrayList<>();
+        List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
+        // Data needs to be sorted based on the [token, number of token, PK]
+        // OR [token, PK] if the index is not partitioned
+        for (LogicalVariable skVar : secondaryKeys) {
+            orderColumns.add(new OrderColumn(skVar, OrderKind.ASC));
+        }
+        for (LogicalVariable pkVar : primaryKeys) {
+            orderColumns.add(new OrderColumn(pkVar, OrderKind.ASC));
+        }
+        localProperties.add(new LocalOrderProperty(orderColumns));
+        StructuralPropertiesVector spv = new StructuralPropertiesVector(physicalProps.getPartitioningProperty(),
+                localProperties);
+        return new PhysicalRequirements(new IPhysicalPropertiesVector[] { spv },
+                IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        IndexInsertDeleteOperator indexInsertDeleteOp = (IndexInsertDeleteOperator) op;
+        assert indexInsertDeleteOp.getOperation() == Kind.INSERT;
+        assert indexInsertDeleteOp.isBulkload();
+
+        IMetadataProvider mp = context.getMetadataProvider();
+        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+        JobSpecification spec = builder.getJobSpec();
+        RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+                context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getIndexInsertRuntime(
+                dataSourceIndex, propagatedSchema, inputSchemas, typeEnv, primaryKeys, secondaryKeys,
+                additionalFilteringKeys, null, inputDesc, context, spec, true);
+        builder.contributeHyracksOperator(indexInsertDeleteOp, runtimeAndConstraints.first);
+        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+        ILogicalOperator src = indexInsertDeleteOp.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, indexInsertDeleteOp, 0);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
new file mode 100644
index 0000000..4bc4c06
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class IndexInsertDeletePOperator extends AbstractPhysicalOperator {
+
+    private final List<LogicalVariable> primaryKeys;
+    private final List<LogicalVariable> secondaryKeys;
+    private final ILogicalExpression filterExpr;
+    private final IDataSourceIndex<?, ?> dataSourceIndex;
+    private final List<LogicalVariable> additionalFilteringKeys;
+
+    public IndexInsertDeletePOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+            List<LogicalVariable> additionalFilteringKeys, Mutable<ILogicalExpression> filterExpr,
+            IDataSourceIndex<?, ?> dataSourceIndex) {
+        this.primaryKeys = primaryKeys;
+        this.secondaryKeys = secondaryKeys;
+        if (filterExpr != null) {
+            this.filterExpr = filterExpr.getValue();
+        } else {
+            this.filterExpr = null;
+        }
+        this.dataSourceIndex = dataSourceIndex;
+        this.additionalFilteringKeys = additionalFilteringKeys;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.INDEX_INSERT_DELETE;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
+        scanVariables.addAll(primaryKeys);
+        scanVariables.add(new LogicalVariable(-1));
+        IPhysicalPropertiesVector r = dataSourceIndex.getDataSource().getPropertiesProvider()
+                .computePropertiesVector(scanVariables);
+        r.getLocalProperties().clear();
+        IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
+        requirements[0] = r;
+        return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        IndexInsertDeleteOperator insertDeleteOp = (IndexInsertDeleteOperator) op;
+        IMetadataProvider mp = context.getMetadataProvider();
+
+        JobSpecification spec = builder.getJobSpec();
+        RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+                context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
+        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(insertDeleteOp);
+        if (insertDeleteOp.getOperation() == Kind.INSERT) {
+            runtimeAndConstraints = mp.getIndexInsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
+                    primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec, false);
+        } else {
+            runtimeAndConstraints = mp.getIndexDeleteRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
+                    primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec);
+        }
+        builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first);
+        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+        ILogicalOperator src = insertDeleteOp.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, insertDeleteOp, 0);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
new file mode 100644
index 0000000..993e8d7
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+@SuppressWarnings("rawtypes")
+public class InsertDeletePOperator extends AbstractPhysicalOperator {
+
+    private LogicalVariable payload;
+    private List<LogicalVariable> keys;
+    private IDataSource<?> dataSource;
+    private final List<LogicalVariable> additionalFilteringKeys;
+
+    public InsertDeletePOperator(LogicalVariable payload, List<LogicalVariable> keys,
+            List<LogicalVariable> additionalFilteringKeys, IDataSource dataSource) {
+        this.payload = payload;
+        this.keys = keys;
+        this.dataSource = dataSource;
+        this.additionalFilteringKeys = additionalFilteringKeys;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.INSERT_DELETE;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
+        scanVariables.addAll(keys);
+        scanVariables.add(new LogicalVariable(-1));
+        IPhysicalPropertiesVector r = dataSource.getPropertiesProvider().computePropertiesVector(scanVariables);
+        r.getLocalProperties().clear();
+        IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
+        requirements[0] = r;
+        return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        InsertDeleteOperator insertDeleteOp = (InsertDeleteOperator) op;
+        IMetadataProvider mp = context.getMetadataProvider();
+        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+        JobSpecification spec = builder.getJobSpec();
+        RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+                context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
+        if (insertDeleteOp.getOperation() == Kind.INSERT) {
+            runtimeAndConstraints = mp.getInsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
+                    additionalFilteringKeys, inputDesc, context, spec, false);
+        } else {
+            runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
+                    additionalFilteringKeys, inputDesc, context, spec);
+        }
+
+        builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first);
+        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+        ILogicalOperator src = insertDeleteOp.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, insertDeleteOp, 0);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
new file mode 100644
index 0000000..27f4b25
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.MaterializingOperatorDescriptor;
+
+public class MaterializePOperator extends AbstractPhysicalOperator {
+
+    private final boolean isSingleActivity;
+
+    public MaterializePOperator(boolean isSingleActivity) {
+        this.isSingleActivity = isSingleActivity;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.MATERIALIZE;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+                propagatedSchema, context);
+        MaterializingOperatorDescriptor materializationOpDesc = new MaterializingOperatorDescriptor(
+                builder.getJobSpec(), recDescriptor, isSingleActivity);
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, materializationOpDesc);
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+
+    @Override
+    public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+        int[] inputDependencyLabels = new int[] { 0 };
+        int[] outputDependencyLabels;
+        if (isSingleActivity) {
+            outputDependencyLabels = new int[] { 0 };
+        } else {
+            outputDependencyLabels = new int[] { 1 };
+        }
+        return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
new file mode 100644
index 0000000..546a77b
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.NestedPlansAccumulatingAggregatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.group.MicroPreClusteredGroupRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class MicroPreclusteredGroupByPOperator extends AbstractPreclusteredGroupByPOperator {
+
+    public MicroPreclusteredGroupByPOperator(List<LogicalVariable> columnList) {
+        super(columnList);
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.MICRO_PRE_CLUSTERED_GROUP_BY;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        int keys[] = JobGenHelper.variablesToFieldIndexes(columnList, inputSchemas[0]);
+        GroupByOperator gby = (GroupByOperator) op;
+        int numFds = gby.getDecorList().size();
+        int fdColumns[] = new int[numFds];
+        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+        int j = 0;
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList()) {
+            ILogicalExpression expr = p.second.getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                throw new AlgebricksException("pre-sorted group-by expects variable references.");
+            }
+            VariableReferenceExpression v = (VariableReferenceExpression) expr;
+            LogicalVariable decor = v.getVariableReference();
+            fdColumns[j++] = inputSchemas[0].findVariable(decor);
+        }
+        // compile subplans and set the gby op. schema accordingly
+        AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], gby, opSchema, context);
+        IAggregatorDescriptorFactory aggregatorFactory = new NestedPlansAccumulatingAggregatorFactory(subplans, keys,
+                fdColumns);
+
+        IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
+                columnList, env, context);
+        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+        RecordDescriptor inputRecordDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op.getInputs().get(0).getValue()),
+                inputSchemas[0], context);
+        MicroPreClusteredGroupRuntimeFactory runtime = new MicroPreClusteredGroupRuntimeFactory(keys,
+                comparatorFactories, aggregatorFactory, inputRecordDesc, recordDescriptor, null);
+        builder.contributeMicroOperator(gby, runtime, recordDescriptor);
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
new file mode 100644
index 0000000..a2e697d
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
@@ -0,0 +1,288 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
+
+/**
+ * Left input is broadcast and preserves its local properties.
+ * Right input can be partitioned in any way.
+ */
+public class NLJoinPOperator extends AbstractJoinPOperator {
+
+    private final int memSize;
+
+    public NLJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, int memSize) {
+        super(kind, partitioningType);
+        this.memSize = memSize;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.NESTED_LOOP;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context) {
+        if (partitioningType != JoinPartitioningType.BROADCAST) {
+            throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
+        }
+
+        IPartitioningProperty pp;
+
+        AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
+
+        if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
+            AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(1).getValue();
+            IPhysicalPropertiesVector pv1 = op2.getPhysicalOperator().getDeliveredProperties();
+            if (pv1 == null) {
+                pp = null;
+            } else {
+                pp = pv1.getPartitioningProperty();
+            }
+        } else {
+            pp = IPartitioningProperty.UNPARTITIONED;
+        }
+
+        List<ILocalStructuralProperty> localProps = new LinkedList<ILocalStructuralProperty>();
+        this.deliveredProperties = new StructuralPropertiesVector(pp, localProps);
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        if (partitioningType != JoinPartitioningType.BROADCAST) {
+            throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
+        }
+
+        StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
+        pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null);
+        pv[1] = new StructuralPropertiesVector(null, null);
+        return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op;
+        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+                propagatedSchema, context);
+        IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1];
+        conditionInputSchemas[0] = propagatedSchema;
+        IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+        IScalarEvaluatorFactory cond = expressionRuntimeProvider.createEvaluatorFactory(join.getCondition().getValue(),
+                context.getTypeEnvironment(op), conditionInputSchemas, context);
+        ITuplePairComparatorFactory comparatorFactory = new TuplePairEvaluatorFactory(cond,
+                context.getBinaryBooleanInspectorFactory());
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+        IOperatorDescriptor opDesc = null;
+
+        switch (kind) {
+            case INNER: {
+                opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, false,
+                        null);
+                break;
+            }
+            case LEFT_OUTER: {
+                INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
+                for (int j = 0; j < nullWriterFactories.length; j++) {
+                    nullWriterFactories[j] = context.getNullWriterFactory();
+                }
+                opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, true,
+                        nullWriterFactories);
+                break;
+            }
+            default: {
+                throw new NotImplementedException();
+            }
+        }
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+        ILogicalOperator src1 = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src1, 0, op, 0);
+        ILogicalOperator src2 = op.getInputs().get(1).getValue();
+        builder.contributeGraphEdge(src2, 0, op, 1);
+    }
+
+    public static class TuplePairEvaluatorFactory implements ITuplePairComparatorFactory {
+
+        private static final long serialVersionUID = 1L;
+        private final IScalarEvaluatorFactory cond;
+        private final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
+
+        public TuplePairEvaluatorFactory(IScalarEvaluatorFactory cond,
+                IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) {
+            this.cond = cond;
+            this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory;
+        }
+
+        @Override
+        public synchronized ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
+            return new TuplePairEvaluator(ctx, cond, binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx));
+        }
+    }
+
+    public static class TuplePairEvaluator implements ITuplePairComparator {
+        private final IHyracksTaskContext ctx;
+        private IScalarEvaluator condEvaluator;
+        private final IScalarEvaluatorFactory condFactory;
+        private final IPointable p;
+        private final CompositeFrameTupleReference compositeTupleRef;
+        private final FrameTupleReference leftRef;
+        private final FrameTupleReference rightRef;
+        private final IBinaryBooleanInspector binaryBooleanInspector;
+
+        public TuplePairEvaluator(IHyracksTaskContext ctx, IScalarEvaluatorFactory condFactory,
+                IBinaryBooleanInspector binaryBooleanInspector) {
+            this.ctx = ctx;
+            this.condFactory = condFactory;
+            this.binaryBooleanInspector = binaryBooleanInspector;
+            this.leftRef = new FrameTupleReference();
+            this.p = VoidPointable.FACTORY.createPointable();
+            this.rightRef = new FrameTupleReference();
+            this.compositeTupleRef = new CompositeFrameTupleReference(leftRef, rightRef);
+        }
+
+        @Override
+        public int compare(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor,
+                int innerIndex) throws HyracksDataException {
+            if (condEvaluator == null) {
+                try {
+                    this.condEvaluator = condFactory.createScalarEvaluator(ctx);
+                } catch (AlgebricksException ae) {
+                    throw new HyracksDataException(ae);
+                }
+            }
+            compositeTupleRef.reset(outerAccessor, outerIndex, innerAccessor, innerIndex);
+            try {
+                condEvaluator.evaluate(compositeTupleRef, p);
+            } catch (AlgebricksException ae) {
+                throw new HyracksDataException(ae);
+            }
+            boolean result = binaryBooleanInspector
+                    .getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength());
+            if (result)
+                return 0;
+            else
+                return 1;
+        }
+    }
+
+    public static class CompositeFrameTupleReference implements IFrameTupleReference {
+
+        private final FrameTupleReference refLeft;
+        private final FrameTupleReference refRight;
+
+        public CompositeFrameTupleReference(FrameTupleReference refLeft, FrameTupleReference refRight) {
+            this.refLeft = refLeft;
+            this.refRight = refRight;
+        }
+
+        public void reset(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor,
+                int innerIndex) {
+            refLeft.reset(outerAccessor, outerIndex);
+            refRight.reset(innerAccessor, innerIndex);
+        }
+
+        @Override
+        public int getFieldCount() {
+            return refLeft.getFieldCount() + refRight.getFieldCount();
+        }
+
+        @Override
+        public byte[] getFieldData(int fIdx) {
+            int leftFieldCount = refLeft.getFieldCount();
+            if (fIdx < leftFieldCount)
+                return refLeft.getFieldData(fIdx);
+            else
+                return refRight.getFieldData(fIdx - leftFieldCount);
+        }
+
+        @Override
+        public int getFieldStart(int fIdx) {
+            int leftFieldCount = refLeft.getFieldCount();
+            if (fIdx < leftFieldCount)
+                return refLeft.getFieldStart(fIdx);
+            else
+                return refRight.getFieldStart(fIdx - leftFieldCount);
+        }
+
+        @Override
+        public int getFieldLength(int fIdx) {
+            int leftFieldCount = refLeft.getFieldCount();
+            if (fIdx < leftFieldCount)
+                return refLeft.getFieldLength(fIdx);
+            else
+                return refRight.getFieldLength(fIdx - leftFieldCount);
+        }
+
+        @Override
+        public IFrameTupleAccessor getFrameTupleAccessor() {
+            throw new NotImplementedException();
+        }
+
+        @Override
+        public int getTupleIndex() {
+            throw new NotImplementedException();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
new file mode 100644
index 0000000..4fdfa32
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class NestedTupleSourcePOperator extends AbstractPhysicalOperator {
+
+    public NestedTupleSourcePOperator() {
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.NESTED_TUPLE_SOURCE;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        Mutable<ILogicalOperator> dataSource = ((NestedTupleSourceOperator) op).getDataSourceReference();
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) dataSource.getValue().getInputs().get(0).getValue();
+        IPhysicalPropertiesVector inheritedProps = op2.getDeliveredPhysicalProperties();
+        AbstractLogicalOperator parent = (AbstractLogicalOperator) dataSource.getValue();
+        if (parent.getOperatorTag() == LogicalOperatorTag.GROUP) {
+            // The following part computes the data property regarding to each particular group.
+            // TODO(buyingyi): we need to add the original data property as well. But currently
+            // there are places assuming there is only one LocalOrderProperty and one
+            // LocalGroupingProperty delivered by an operator.
+            GroupByOperator gby = (GroupByOperator) parent;
+            List<ILocalStructuralProperty> originalLocalProperties = inheritedProps.getLocalProperties();
+            List<ILocalStructuralProperty> newLocalProperties = null;
+            if (originalLocalProperties != null) {
+                newLocalProperties = new ArrayList<ILocalStructuralProperty>();
+                for (ILocalStructuralProperty lsp : inheritedProps.getLocalProperties()) {
+                    ILocalStructuralProperty newLsp = lsp.regardToGroup(gby.getGbyVarList());
+                    if (newLsp != null) {
+                        newLocalProperties.add(newLsp);
+                    }
+                }
+            }
+            deliveredProperties = new StructuralPropertiesVector(inheritedProps.getPartitioningProperty(),
+                    newLocalProperties);
+        } else {
+            deliveredProperties = inheritedProps.clone();
+        }
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        return null;
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        propagatedSchema.addAllVariables(outerPlanSchema);
+        NestedTupleSourceRuntimeFactory runtime = new NestedTupleSourceRuntimeFactory();
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+                context);
+        builder.contributeMicroOperator(op, runtime, recDesc);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
new file mode 100644
index 0000000..456744e
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+
+public class OneToOneExchangePOperator extends AbstractExchangePOperator {
+
+    public OneToOneExchangePOperator() {
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.ONE_TO_ONE_EXCHANGE;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) {
+        IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec);
+        return new Pair<IConnectorDescriptor, TargetConstraint>(conn, TargetConstraint.SAME_COUNT);
+    }
+
+}


Mime
View raw message