asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ima...@apache.org
Subject [25/51] [partial] incubator-asterixdb-hyracks git commit: Change folder structure for Java repackage
Date Tue, 25 Aug 2015 16:41:38 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
new file mode 100644
index 0000000..a24c48d
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.SimpleAlgebricksAccumulatingAggregatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+
+public class PreSortedDistinctByPOperator extends AbstractPhysicalOperator {
+
+    private List<LogicalVariable> columnList;
+
+    public PreSortedDistinctByPOperator(List<LogicalVariable> columnList) {
+        this.columnList = columnList;
+    }
+
+    public void setDistinctByColumns(List<LogicalVariable> distinctByColumns) {
+        this.columnList = distinctByColumns;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        IPartitioningProperty pp = op2.getDeliveredPhysicalProperties().getPartitioningProperty();
+        List<ILocalStructuralProperty> propsLocal = op2.getDeliveredPhysicalProperties().getLocalProperties();
+        deliveredProperties = new StructuralPropertiesVector(pp, propsLocal);
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+        List<ILocalStructuralProperty> localProps = new ArrayList<ILocalStructuralProperty>();
+        List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
+        for (LogicalVariable column : columnList) {
+            orderColumns.add(new OrderColumn(column, OrderKind.ASC));
+        }
+        localProps.add(new LocalOrderProperty(orderColumns));
+        IPartitioningProperty pp = null;
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
+        if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
+            pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList), null);
+        }
+        pv[0] = new StructuralPropertiesVector(pp, localProps);
+        return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+        int keys[] = JobGenHelper.variablesToFieldIndexes(columnList, inputSchemas[0]);
+        int sz = inputSchemas[0].getSize();
+        int fdSz = sz - columnList.size();
+        int[] fdColumns = new int[fdSz];
+        int j = 0;
+        for (LogicalVariable v : inputSchemas[0]) {
+            if (!columnList.contains(v)) {
+                fdColumns[j++] = inputSchemas[0].findVariable(v);
+            }
+        }
+        int[] keysAndDecs = new int[keys.length + fdColumns.length];
+        for (int i = 0; i < keys.length; i++) {
+            keysAndDecs[i] = keys[i];
+        }
+        for (int i = 0; i < fdColumns.length; i++) {
+            keysAndDecs[i + keys.length] = fdColumns[i];
+        }
+
+        IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
+                columnList, context.getTypeEnvironment(op), context);
+        IAggregateEvaluatorFactory[] aggFactories = new IAggregateEvaluatorFactory[] {};
+        IAggregatorDescriptorFactory aggregatorFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(
+                aggFactories, keysAndDecs);
+
+        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+                context);
+        /** make fd columns part of the key but the comparator only compares the distinct key columns */
+        PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keysAndDecs,
+                comparatorFactories, aggregatorFactory, recordDescriptor);
+
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
new file mode 100644
index 0000000..bcd998d
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.NestedPlansAccumulatingAggregatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.NestedPlansRunningAggregatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+
+public class PreclusteredGroupByPOperator extends AbstractPreclusteredGroupByPOperator {
+
+    public PreclusteredGroupByPOperator(List<LogicalVariable> columnList) {
+        super(columnList);
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.PRE_CLUSTERED_GROUP_BY;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        int keys[] = JobGenHelper.variablesToFieldIndexes(columnList, inputSchemas[0]);
+        GroupByOperator gby = (GroupByOperator) op;
+        int numFds = gby.getDecorList().size();
+        int fdColumns[] = new int[numFds];
+        int j = 0;
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList()) {
+            ILogicalExpression expr = p.second.getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                throw new AlgebricksException("pre-sorted group-by expects variable references.");
+            }
+            VariableReferenceExpression v = (VariableReferenceExpression) expr;
+            LogicalVariable decor = v.getVariableReference();
+            fdColumns[j++] = inputSchemas[0].findVariable(decor);
+        }
+        // compile subplans and set the gby op. schema accordingly
+        AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], gby, opSchema, context);
+        IAggregatorDescriptorFactory aggregatorFactory;
+
+        if (((AbstractLogicalOperator) (gby.getNestedPlans().get(0).getRoots().get(0).getValue())).getOperatorTag() == LogicalOperatorTag.RUNNINGAGGREGATE) {
+            aggregatorFactory = new NestedPlansRunningAggregatorFactory(subplans, keys, fdColumns);
+        } else {
+            aggregatorFactory = new NestedPlansAccumulatingAggregatorFactory(subplans, keys, fdColumns);
+        }
+
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+        IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
+                columnList, context.getTypeEnvironment(op), context);
+        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+                context);
+
+        PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keys,
+                comparatorFactories, aggregatorFactory, recordDescriptor);
+
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
new file mode 100644
index 0000000..2478859
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
+
+public class RandomMergeExchangePOperator extends AbstractExchangePOperator {
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.RANDOM_MERGE_EXCHANGE;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        this.deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED,
+                new ArrayList<ILocalStructuralProperty>(0));
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) {
+        IConnectorDescriptor conn = new MToNReplicatingConnectorDescriptor(spec);
+        return new Pair<IConnectorDescriptor, TargetConstraint>(conn, TargetConstraint.ONE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
new file mode 100644
index 0000000..c633f7b
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.RandomPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+
+public class RandomPartitionPOperator extends AbstractExchangePOperator {
+
+    private final INodeDomain domain;
+
+    public RandomPartitionPOperator(INodeDomain domain) {
+        this.domain = domain;
+    }
+
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        Pair<IConnectorDescriptor, TargetConstraint> connPair = createConnectorDescriptor(builder.getJobSpec(), op,
+                opSchema, context);
+        builder.contributeConnectorWithTargetConstraint(op, connPair.first, connPair.second);
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
+        ITuplePartitionComputerFactory tpcf = new RandomPartitionComputerFactory(domain.cardinality());
+        MToNPartitioningConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
+        return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.RANDOM_PARTITION_EXCHANGE;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        this.deliveredProperties = new StructuralPropertiesVector(new RandomPartitioningProperty(domain), op2
+                .getDeliveredPhysicalProperties().getLocalProperties());
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        return emptyUnaryRequirements();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
new file mode 100644
index 0000000..99f68e9
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+
+public class RangePartitionMergePOperator extends AbstractExchangePOperator {
+
+    private List<OrderColumn> partitioningFields;
+    private INodeDomain domain;
+    private IRangeMap rangeMap;
+
+    public RangePartitionMergePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, IRangeMap rangeMap) {
+        this.partitioningFields = partitioningFields;
+        this.domain = domain;
+        this.rangeMap = rangeMap;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.RANGE_PARTITION_MERGE_EXCHANGE;
+    }
+
+    public List<OrderColumn> getPartitioningFields() {
+        return partitioningFields;
+    }
+
+    public INodeDomain getDomain() {
+        return domain;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        List<LogicalVariable> varList = new ArrayList<LogicalVariable>();
+        for (OrderColumn oc : partitioningFields) {
+            varList.add(oc.getColumn());
+        }
+        IPartitioningProperty p = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(varList), domain);
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        List<ILocalStructuralProperty> op2Locals = op2.getDeliveredPhysicalProperties().getLocalProperties();
+        List<ILocalStructuralProperty> locals = new ArrayList<ILocalStructuralProperty>();
+        for (ILocalStructuralProperty prop : op2Locals) {
+            if (prop.getPropertyType() == PropertyType.LOCAL_ORDER_PROPERTY) {
+                locals.add(prop);
+            } else {
+                break;
+            }
+        }
+
+        this.deliveredProperties = new StructuralPropertiesVector(p, locals);
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        List<ILocalStructuralProperty> orderProps = new LinkedList<ILocalStructuralProperty>();
+        List<OrderColumn> columns = new ArrayList<OrderColumn>();
+        for (OrderColumn oc : partitioningFields) {
+            LogicalVariable var = oc.getColumn();
+            columns.add(new OrderColumn(var, oc.getOrder()));
+        }
+        orderProps.add(new LocalOrderProperty(columns));
+        StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(null,
+                orderProps) };
+        return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @Override
+    public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
+        int n = partitioningFields.size();
+        int[] sortFields = new int[n];
+        IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+
+        INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+        INormalizedKeyComputerFactory nkcf = null;
+
+        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+        int i = 0;
+        for (OrderColumn oc : partitioningFields) {
+            LogicalVariable var = oc.getColumn();
+            sortFields[i] = opSchema.findVariable(var);
+            Object type = env.getVarType(var);
+            OrderKind order = oc.getOrder();
+            if (i == 0 && nkcfProvider != null && type != null) {
+                nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC);
+            }
+            IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+            comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
+            i++;
+        }
+        ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
+        IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps, nkcf);
+        return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+    }
+
+    @Override
+    public String toString() {
+        return getOperatorTag().toString() + " " + partitioningFields + " SPLIT COUNT:" + rangeMap.getSplitCount();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
new file mode 100644
index 0000000..66955e9
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+
+public class RangePartitionPOperator extends AbstractExchangePOperator {
+
+    private List<OrderColumn> partitioningFields;
+    private INodeDomain domain;
+    private IRangeMap rangeMap;
+
+    public RangePartitionPOperator(List<OrderColumn> partitioningFields, INodeDomain domain, IRangeMap rangeMap) {
+        this.partitioningFields = partitioningFields;
+        this.domain = domain;
+        this.rangeMap = rangeMap;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.RANGE_PARTITION_EXCHANGE;
+    }
+
+    public List<OrderColumn> getPartitioningFields() {
+        return partitioningFields;
+    }
+
+    public INodeDomain getDomain() {
+        return domain;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        IPartitioningProperty p = new OrderedPartitionedProperty(new ArrayList<OrderColumn>(partitioningFields), domain);
+        this.deliveredProperties = new StructuralPropertiesVector(p, new LinkedList<ILocalStructuralProperty>());
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
+        int n = partitioningFields.size();
+        int[] sortFields = new int[n];
+        IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+
+        INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+        INormalizedKeyComputerFactory nkcf = null;
+
+        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+        int i = 0;
+        for (OrderColumn oc : partitioningFields) {
+            LogicalVariable var = oc.getColumn();
+            sortFields[i] = opSchema.findVariable(var);
+            Object type = env.getVarType(var);
+            OrderKind order = oc.getOrder();
+            if (i == 0 && nkcfProvider != null && type != null) {
+                nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC);
+            }
+            IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+            comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
+            i++;
+        }
+        ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
+        IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
+        return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+    }
+
+    @Override
+    public String toString() {
+        return getOperatorTag().toString() + " " + partitioningFields + " SPLIT COUNT:" + rangeMap.getSplitCount();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
new file mode 100644
index 0000000..d8f7d99
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+
+public class ReplicatePOperator extends AbstractPhysicalOperator {
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.SPLIT;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+
+        ReplicateOperator rop = (ReplicateOperator) op;
+        int outputArity = rop.getOutputArity();
+        boolean[] outputMaterializationFlags = rop.getOutputMaterializationFlags();
+
+        SplitOperatorDescriptor splitOpDesc = new SplitOperatorDescriptor(spec, recDescriptor, outputArity, outputMaterializationFlags);
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc);
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+
+    @Override
+    public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+        int[] inputDependencyLabels = new int[] { 0 };
+        ReplicateOperator rop = (ReplicateOperator) op;
+        int[] outputDependencyLabels = new int[rop.getOutputArity()];
+        // change the labels of outputs that requires materialization to 1
+        boolean[] outputMaterializationFlags = rop.getOutputMaterializationFlags();
+        for (int i = 0; i < rop.getOutputArity(); i++) {
+            if (outputMaterializationFlags[i]) {
+                outputDependencyLabels[i] = 1;
+            }
+        }
+        return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
new file mode 100644
index 0000000..3b2387a
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.RunningAggregateRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class RunningAggregatePOperator extends AbstractPhysicalOperator {
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.RUNNING_AGGREGATE;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        RunningAggregateOperator ragg = (RunningAggregateOperator) op;
+        List<LogicalVariable> variables = ragg.getVariables();
+        List<Mutable<ILogicalExpression>> expressions = ragg.getExpressions();
+        int[] outColumns = new int[variables.size()];
+        for (int i = 0; i < outColumns.length; i++) {
+            outColumns[i] = opSchema.findVariable(variables.get(i));
+        }
+        IRunningAggregateEvaluatorFactory[] runningAggFuns = new IRunningAggregateEvaluatorFactory[expressions.size()];
+        IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+        for (int i = 0; i < runningAggFuns.length; i++) {
+            StatefulFunctionCallExpression expr = (StatefulFunctionCallExpression) expressions.get(i).getValue();
+            runningAggFuns[i] = expressionRuntimeProvider.createRunningAggregateFunctionFactory(expr,
+                    context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+        }
+
+        // TODO push projections into the operator
+        int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
+
+        RunningAggregateRuntimeFactory runtime = new RunningAggregateRuntimeFactory(outColumns, runningAggFuns,
+                projectionList);
+
+        // contribute one Asterix framewriter
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+        builder.contributeMicroOperator(ragg, runtime, recDesc);
+        // and contribute one edge from its child
+        ILogicalOperator src = ragg.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, ragg, 0);
+
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
new file mode 100644
index 0000000..ba299ac
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.misc.SinkOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.union.UnionAllOperatorDescriptor;
+
+public class SinkPOperator extends AbstractPhysicalOperator {
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.SINK;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op2;
+        List<ILocalStructuralProperty> propsLocal = new ArrayList<ILocalStructuralProperty>();
+        IPhysicalPropertiesVector childsProperties = null;
+
+        for (int i = 0; i < op.getInputs().size(); i++) {
+            op2 = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
+            childsProperties = op2.getPhysicalOperator().getDeliveredProperties();
+            if (childsProperties.getLocalProperties() != null) {
+                propsLocal.addAll(childsProperties.getLocalProperties());
+            }
+        }
+
+        deliveredProperties = new StructuralPropertiesVector(childsProperties.getPartitioningProperty(), propsLocal);
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+      return emptyUnaryRequirements(op.getInputs().size());
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+
+        SinkOperatorDescriptor opDesc = new SinkOperatorDescriptor(spec, op.getInputs().size());
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+        for (int i = 0; i < op.getInputs().size(); i++) {
+            builder.contributeGraphEdge(op.getInputs().get(i).getValue(), 0, op, i);
+        }
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
new file mode 100644
index 0000000..9105c1f
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class SinkWritePOperator extends AbstractPhysicalOperator {
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.SINK_WRITE;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        ILogicalOperator op2 = op.getInputs().get(0).getValue();
+        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        WriteOperator write = (WriteOperator) op;
+        IDataSink sink = write.getDataSink();
+        IPartitioningProperty pp = sink.getPartitioningProperty();
+        StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, null) };
+        return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        WriteOperator write = (WriteOperator) op;
+        int[] columns = new int[write.getExpressions().size()];
+        int i = 0;
+        for (Mutable<ILogicalExpression> exprRef : write.getExpressions()) {
+            ILogicalExpression expr = exprRef.getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                throw new NotImplementedException("Only writing variable expressions is supported.");
+            }
+            VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
+            LogicalVariable v = varRef.getVariableReference();
+            columns[i++] = inputSchemas[0].findVariable(v);
+        }
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+        RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0],
+                context);
+
+        IPrinterFactory[] pf = JobGenHelper.mkPrinterFactories(inputSchemas[0], context.getTypeEnvironment(op),
+                context, columns);
+
+        IMetadataProvider<?, ?> mp = context.getMetadataProvider();
+
+        Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> runtime = mp.getWriteFileRuntime(write.getDataSink(),
+                columns, pf, inputDesc);
+
+        builder.contributeMicroOperator(write, runtime.first, recDesc, runtime.second);
+        ILogicalOperator src = write.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, write, 0);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
new file mode 100644
index 0000000..6ad1fab
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IPartialAggregationTypeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalGroupingProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.OperatorSchemaImpl;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.SimpleAlgebricksAccumulatingAggregatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.sort.SortGroupByOperatorDescriptor;
+
+public class SortGroupByPOperator extends AbstractPhysicalOperator {
+
+    private final int frameLimit;
+    private final OrderColumn[] orderColumns;
+    private final List<LogicalVariable> columnSet = new ArrayList<LogicalVariable>();
+
+    public SortGroupByPOperator(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList, int frameLimit,
+            OrderColumn[] orderColumns) {
+        this.frameLimit = frameLimit;
+        this.orderColumns = orderColumns;
+        computeColumnSet(gbyList);
+    }
+
+    private void computeColumnSet(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList) {
+        columnSet.clear();
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
+            ILogicalExpression expr = p.second.getValue();
+            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                VariableReferenceExpression v = (VariableReferenceExpression) expr;
+                columnSet.add(v.getVariableReference());
+            }
+        }
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.SORT_GROUP_BY;
+    }
+
+    @Override
+    public String toString() {
+        return getOperatorTag().toString() + columnSet;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    private List<LogicalVariable> getGbyColumns() {
+        return columnSet;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        List<ILocalStructuralProperty> propsLocal = new LinkedList<ILocalStructuralProperty>();
+
+        GroupByOperator gOp = (GroupByOperator) op;
+        Set<LogicalVariable> columnSet = new ListSet<LogicalVariable>();
+        List<OrderColumn> ocs = new ArrayList<OrderColumn>();
+
+        if (!columnSet.isEmpty()) {
+            propsLocal.add(new LocalGroupingProperty(columnSet));
+        }
+        for (OrderColumn oc : orderColumns) {
+            ocs.add(oc);
+        }
+        propsLocal.add(new LocalOrderProperty(ocs));
+        for (ILogicalPlan p : gOp.getNestedPlans()) {
+            for (Mutable<ILogicalOperator> r : p.getRoots()) {
+                ILogicalOperator rOp = r.getValue();
+                propsLocal.addAll(rOp.getDeliveredPhysicalProperties().getLocalProperties());
+            }
+        }
+
+        ILogicalOperator op2 = op.getInputs().get(0).getValue();
+        IPhysicalPropertiesVector childProp = op2.getDeliveredPhysicalProperties();
+        deliveredProperties = new StructuralPropertiesVector(childProp.getPartitioningProperty(), propsLocal);
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        List<LogicalVariable> gbyCols = getGbyColumns();
+        int keys[] = JobGenHelper.variablesToFieldIndexes(gbyCols, inputSchemas[0]);
+        GroupByOperator gby = (GroupByOperator) op;
+        int numFds = gby.getDecorList().size();
+        int fdColumns[] = new int[numFds];
+        int j = 0;
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList()) {
+            ILogicalExpression expr = p.second.getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                throw new AlgebricksException("Sort group-by expects variable references.");
+            }
+            VariableReferenceExpression v = (VariableReferenceExpression) expr;
+            LogicalVariable decor = v.getVariableReference();
+            fdColumns[j++] = inputSchemas[0].findVariable(decor);
+        }
+
+        if (gby.getNestedPlans().size() != 1) {
+            throw new AlgebricksException(
+                    "Sort group-by currently works only for one nested plan with one root containing"
+                            + "an aggregate and a nested-tuple-source.");
+        }
+        ILogicalPlan p0 = gby.getNestedPlans().get(0);
+        if (p0.getRoots().size() != 1) {
+            throw new AlgebricksException(
+                    "Sort group-by currently works only for one nested plan with one root containing"
+                            + "an aggregate and a nested-tuple-source.");
+        }
+        Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
+        AggregateOperator aggOp = (AggregateOperator) r0.getValue();
+
+        IPartialAggregationTypeComputer partialAggregationTypeComputer = context.getPartialAggregationTypeComputer();
+        List<Object> intermediateTypes = new ArrayList<Object>();
+        int n = aggOp.getExpressions().size();
+        IAggregateEvaluatorFactory[] aff = new IAggregateEvaluatorFactory[n];
+        int i = 0;
+        IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+        IVariableTypeEnvironment aggOpInputEnv = context.getTypeEnvironment(aggOp.getInputs().get(0).getValue());
+        IVariableTypeEnvironment outputEnv = context.getTypeEnvironment(op);
+        for (Mutable<ILogicalExpression> exprRef : aggOp.getExpressions()) {
+            AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) exprRef.getValue();
+            aff[i++] = expressionRuntimeProvider.createAggregateFunctionFactory(aggFun, aggOpInputEnv, inputSchemas,
+                    context);
+            intermediateTypes.add(partialAggregationTypeComputer.getType(aggFun, aggOpInputEnv,
+                    context.getMetadataProvider()));
+        }
+
+        int[] keyAndDecFields = new int[keys.length + fdColumns.length];
+        for (i = 0; i < keys.length; ++i) {
+            keyAndDecFields[i] = keys[i];
+        }
+        for (i = 0; i < fdColumns.length; i++) {
+            keyAndDecFields[keys.length + i] = fdColumns[i];
+        }
+
+        List<LogicalVariable> keyAndDecVariables = new ArrayList<LogicalVariable>();
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getGroupByList()) {
+            keyAndDecVariables.add(p.first);
+        }
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList()) {
+            keyAndDecVariables.add(GroupByOperator.getDecorVariable(p));
+        }
+
+        for (LogicalVariable var : keyAndDecVariables) {
+            aggOpInputEnv.setVarType(var, outputEnv.getVarType(var));
+        }
+
+        compileSubplans(inputSchemas[0], gby, opSchema, context);
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+
+        IBinaryComparatorFactory[] compFactories = new IBinaryComparatorFactory[gbyCols.size()];
+        IBinaryComparatorFactoryProvider bcfProvider = context.getBinaryComparatorFactoryProvider();
+        i = 0;
+        for (LogicalVariable v : gbyCols) {
+            Object type = aggOpInputEnv.getVarType(v);
+            if (orderColumns[i].getOrder() == OrderKind.ASC) {
+                compFactories[i] = bcfProvider.getBinaryComparatorFactory(type, true);
+            } else {
+                compFactories[i] = bcfProvider.getBinaryComparatorFactory(type, false);
+            }
+            i++;
+        }
+        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+                context);
+
+        IAggregateEvaluatorFactory[] merges = new IAggregateEvaluatorFactory[n];
+        List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
+        IOperatorSchema[] localInputSchemas = new IOperatorSchema[1];
+        localInputSchemas[0] = new OperatorSchemaImpl();
+        for (i = 0; i < n; i++) {
+            AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) aggOp.getMergeExpressions()
+                    .get(i).getValue();
+            aggFun.getUsedVariables(usedVars);
+        }
+        i = 0;
+        for (Object type : intermediateTypes) {
+            aggOpInputEnv.setVarType(usedVars.get(i++), type);
+        }
+        for (LogicalVariable keyVar : keyAndDecVariables) {
+            localInputSchemas[0].addVariable(keyVar);
+        }
+        for (LogicalVariable usedVar : usedVars) {
+            localInputSchemas[0].addVariable(usedVar);
+        }
+        for (i = 0; i < n; i++) {
+            AggregateFunctionCallExpression mergeFun = (AggregateFunctionCallExpression) aggOp.getMergeExpressions()
+                    .get(i).getValue();
+            merges[i] = expressionRuntimeProvider.createAggregateFunctionFactory(mergeFun, aggOpInputEnv,
+                    localInputSchemas, context);
+        }
+        RecordDescriptor partialAggRecordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+                localInputSchemas[0], context);
+
+        IAggregatorDescriptorFactory aggregatorFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(aff,
+                keyAndDecFields);
+        IAggregatorDescriptorFactory mergeFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(merges,
+                keyAndDecFields);
+
+        INormalizedKeyComputerFactory normalizedKeyFactory = null;
+        INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+        if (nkcfProvider == null) {
+            normalizedKeyFactory = null;
+        }
+        Object type = aggOpInputEnv.getVarType(gbyCols.get(0));
+        normalizedKeyFactory = orderColumns[0].getOrder() == OrderKind.ASC ? nkcfProvider
+                .getNormalizedKeyComputerFactory(type, true) : nkcfProvider
+                .getNormalizedKeyComputerFactory(type, false);
+        SortGroupByOperatorDescriptor gbyOpDesc = new SortGroupByOperatorDescriptor(spec, frameLimit, keys,
+                keyAndDecFields, normalizedKeyFactory, compFactories, aggregatorFactory, mergeFactory,
+                partialAggRecordDescriptor, recordDescriptor, false);
+
+        contributeOpDesc(builder, gby, gbyOpDesc);
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+
+    @Override
+    public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+        int[] inputDependencyLabels = new int[] { 0 };
+        int[] outputDependencyLabels = new int[] { 1 };
+        return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
new file mode 100644
index 0000000..7f29375
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+
+public class SortMergeExchangePOperator extends AbstractExchangePOperator {
+
+    private final OrderColumn[] sortColumns;
+
+    public SortMergeExchangePOperator(OrderColumn[] sortColumns) {
+        this.sortColumns = sortColumns;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.SORT_MERGE_EXCHANGE;
+    }
+
+    public OrderColumn[] getSortColumns() {
+        return sortColumns;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(getOperatorTag());
+        sb.append(" [");
+        sb.append(sortColumns[0]);
+        for (int i = 1; i < sortColumns.length; i++) {
+            sb.append(", " + sortColumns[i]);
+        }
+        sb.append(" ]");
+        return sb.toString();
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator inp1 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        IPhysicalPropertiesVector pv1 = inp1.getDeliveredPhysicalProperties();
+        if (pv1 == null) {
+            inp1.computeDeliveredPhysicalProperties(context);
+            pv1 = inp1.getDeliveredPhysicalProperties();
+        }
+
+        List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
+        List<ILocalStructuralProperty> localProps = new ArrayList<ILocalStructuralProperty>(sortColumns.length);
+        for (ILocalStructuralProperty prop : pv1.getLocalProperties()) {
+            if (prop.getPropertyType() == PropertyType.LOCAL_ORDER_PROPERTY) {
+                LocalOrderProperty lop = (LocalOrderProperty) prop;
+                for (OrderColumn oc : lop.getOrderColumns()) {
+                    if (oc.equals(sortColumns[orderColumns.size()])) {
+                        orderColumns.add(oc);
+                        if (orderColumns.size() == sortColumns.length) {
+                            break;
+                        }
+                    } else {
+                        break;
+                    }
+                }
+            } else {
+                continue;
+            }
+        }
+        if (orderColumns.size() > 0) {
+            localProps.add(new LocalOrderProperty(orderColumns));
+        }
+        this.deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, localProps);
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        List<ILocalStructuralProperty> localProps = new ArrayList<ILocalStructuralProperty>(sortColumns.length);
+        localProps.add(new LocalOrderProperty(Arrays.asList(sortColumns)));
+        StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(null,
+                localProps) };
+        return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @Override
+    public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
+        int n = sortColumns.length;
+        int[] sortFields = new int[n];
+        IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+        IBinaryHashFunctionFactory[] hashFuns = new IBinaryHashFunctionFactory[n];
+        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+
+        INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+        INormalizedKeyComputerFactory nkcf = null;
+
+        for (int i = 0; i < n; i++) {
+            sortFields[i] = opSchema.findVariable(sortColumns[i].getColumn());
+            Object type = env.getVarType(sortColumns[i].getColumn());
+            IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+            comps[i] = bcfp.getBinaryComparatorFactory(type, sortColumns[i].getOrder() == OrderKind.ASC);
+            IBinaryHashFunctionFactoryProvider bhffp = context.getBinaryHashFunctionFactoryProvider();
+            hashFuns[i] = bhffp.getBinaryHashFunctionFactory(type);
+            if (i == 0 && nkcfProvider != null && type != null) {
+                nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, sortColumns[i].getOrder() == OrderKind.ASC);
+            }
+        }
+        ITuplePartitionComputerFactory tpcf = new FieldHashPartitionComputerFactory(sortFields, hashFuns);
+        IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps, nkcf);
+        return new Pair<IConnectorDescriptor, TargetConstraint>(conn, TargetConstraint.ONE);
+    }
+
+}


Mime
View raw message