asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ima...@apache.org
Subject [24/51] [partial] incubator-asterixdb-hyracks git commit: Change folder structure for Java repackage
Date Tue, 25 Aug 2015 16:41:37 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
new file mode 100644
index 0000000..2c4d89d
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+
+/**
+ * This will always be attached to an {@link OrderOperator} logical operator.
+ */
+
+public class StableSortPOperator extends AbstractStableSortPOperator {
+
+    private int maxNumberOfFrames;
+
+    public StableSortPOperator(int maxNumberOfFrames) {
+        super();
+        this.maxNumberOfFrames = maxNumberOfFrames;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.STABLE_SORT;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+        int n = sortColumns.length;
+        int[] sortFields = new int[n];
+        IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+
+        INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+        INormalizedKeyComputerFactory nkcf = null;
+
+        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+        int i = 0;
+        for (OrderColumn oc : sortColumns) {
+            LogicalVariable var = oc.getColumn();
+            sortFields[i] = opSchema.findVariable(var);
+            Object type = env.getVarType(var);
+            OrderKind order = oc.getOrder();
+            if (i == 0 && nkcfProvider != null && type != null) {
+                nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC);
+            }
+            IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+            comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
+            i++;
+        }
+
+        ExternalSortOperatorDescriptor sortOpDesc = new ExternalSortOperatorDescriptor(spec, maxNumberOfFrames,
+                sortFields, nkcf, comps, recDescriptor);
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, sortOpDesc);
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
new file mode 100644
index 0000000..63ddffe
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.StreamLimitRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class StreamLimitPOperator extends AbstractPhysicalOperator {
+
+    public StreamLimitPOperator() {
+
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.STREAM_LIMIT;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator limitOp = (AbstractLogicalOperator) op;
+        ILogicalOperator op2 = op.getInputs().get(0).getValue();
+        if (limitOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
+            //partitioning property: unpartitioned;  local property: whatever from the child
+            deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, op2
+                    .getDeliveredPhysicalProperties().getLocalProperties());
+        } else {
+            deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+        }
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        AbstractLogicalOperator limitOp = (AbstractLogicalOperator) op;
+        if (limitOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
+            StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+            pv[0] = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, null);
+            return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+        } else {
+            return emptyUnaryRequirements();
+        }
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        LimitOperator limit = (LimitOperator) op;
+        IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+        IScalarEvaluatorFactory maxObjectsFact = expressionRuntimeProvider.createEvaluatorFactory(limit.getMaxObjects()
+                .getValue(), env, inputSchemas, context);
+        ILogicalExpression offsetExpr = limit.getOffset().getValue();
+        IScalarEvaluatorFactory offsetFact = (offsetExpr == null) ? null : expressionRuntimeProvider
+                .createEvaluatorFactory(offsetExpr, env, inputSchemas, context);
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+                context);
+        StreamLimitRuntimeFactory runtime = new StreamLimitRuntimeFactory(maxObjectsFact, offsetFact, null,
+                context.getBinaryIntegerInspectorFactory());
+        builder.contributeMicroOperator(limit, runtime, recDesc);
+        // and contribute one edge from its child
+        ILogicalOperator src = limit.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, limit, 0);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
new file mode 100644
index 0000000..2ad5fda
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.StreamProjectRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class StreamProjectPOperator extends AbstractPropagatePropertiesForUsedVariablesPOperator {
+
+    private boolean flushFramesRapidly;
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.STREAM_PROJECT;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        ProjectOperator project = (ProjectOperator) op;
+        int[] projectionList = new int[project.getVariables().size()];
+        int i = 0;
+        for (LogicalVariable v : project.getVariables()) {
+            int pos = inputSchemas[0].findVariable(v);
+            if (pos < 0) {
+                throw new AlgebricksException("Could not find variable " + v + ".");
+            }
+            projectionList[i++] = pos;
+        }
+        StreamProjectRuntimeFactory runtime = new StreamProjectRuntimeFactory(projectionList, flushFramesRapidly);
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+                context);
+        builder.contributeMicroOperator(project, runtime, recDesc);
+        ILogicalOperator src = project.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, project, 0);
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        ProjectOperator p = (ProjectOperator) op;
+        computeDeliveredPropertiesForUsedVariables(p, p.getVariables());
+    }
+
+    public void setRapidFrameFlush(boolean flushFramesRapidly) {
+        this.flushFramesRapidly = flushFramesRapidly;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
new file mode 100644
index 0000000..3999fa6
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class StreamSelectPOperator extends AbstractPhysicalOperator {
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.STREAM_SELECT;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        ILogicalOperator op2 = op.getInputs().get(0).getValue();
+        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        SelectOperator select = (SelectOperator) op;
+        IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+        IScalarEvaluatorFactory cond = expressionRuntimeProvider.createEvaluatorFactory(select.getCondition()
+                .getValue(), context.getTypeEnvironment(op), inputSchemas, context);
+        StreamSelectRuntimeFactory runtime = new StreamSelectRuntimeFactory(cond, null,
+                context.getBinaryBooleanInspectorFactory(), select.getRetainNull(), inputSchemas[0].findVariable(select
+                        .getNullPlaceholderVariable()), context.getNullWriterFactory());
+        // contribute one Asterix framewriter
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+        builder.contributeMicroOperator(select, runtime, recDesc);
+        // and contribute one edge from its child
+        ILogicalOperator src = select.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, select, 0);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
new file mode 100644
index 0000000..e6a0be7
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.scripting.IScriptDescription;
+import edu.uci.ics.hyracks.algebricks.core.algebra.scripting.IScriptDescription.ScriptKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.scripting.StringStreamingScriptDescription;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.StringStreamingRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class StringStreamingScriptPOperator extends AbstractPropagatePropertiesForUsedVariablesPOperator {
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.STRING_STREAM_SCRIPT;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        ScriptOperator scriptOp = (ScriptOperator) op;
+        IScriptDescription scriptDesc = scriptOp.getScriptDescription();
+        if (scriptDesc.getKind() != ScriptKind.STRING_STREAMING) {
+            throw new IllegalStateException();
+        }
+        StringStreamingScriptDescription sssd = (StringStreamingScriptDescription) scriptDesc;
+        StringStreamingRuntimeFactory runtime = new StringStreamingRuntimeFactory(sssd.getCommand(),
+                sssd.getPrinterFactories(), sssd.getFieldDelimiter(), sssd.getParserFactory());
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+                context);
+        builder.contributeMicroOperator(scriptOp, runtime, recDesc);
+        // and contribute one edge from its child
+        ILogicalOperator src = scriptOp.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, scriptOp, 0);
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        ScriptOperator s = (ScriptOperator) op;
+        computeDeliveredPropertiesForUsedVariables(s, s.getInputVariables());
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
new file mode 100644
index 0000000..efc609a
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.SubplanRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class SubplanPOperator extends AbstractPhysicalOperator {
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.SUBPLAN;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        IPhysicalPropertiesVector childsProperties = op2.getPhysicalOperator().getDeliveredProperties();
+        List<ILocalStructuralProperty> propsLocal = new ArrayList<ILocalStructuralProperty>();
+        if (childsProperties.getLocalProperties() != null) {
+            propsLocal.addAll(childsProperties.getLocalProperties());
+        }
+        // ... get local properties for newly created variables...
+        SubplanOperator subplan = (SubplanOperator) op;
+        for (ILogicalPlan plan : subplan.getNestedPlans()) {
+            for (Mutable<ILogicalOperator> r : plan.getRoots()) {
+                AbstractLogicalOperator rOp = (AbstractLogicalOperator) r.getValue();
+                propsLocal.addAll(rOp.getPhysicalOperator().getDeliveredProperties().getLocalProperties());
+            }
+        }
+
+        deliveredProperties = new StructuralPropertiesVector(childsProperties.getPartitioningProperty(), propsLocal);
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        SubplanOperator subplan = (SubplanOperator) op;
+        if (subplan.getNestedPlans().size() != 1) {
+            throw new NotImplementedException("Subplan currently works only for one nested plan with one root.");
+        }
+        AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], subplan, opSchema, context);
+        assert (subplans.length == 1);
+        AlgebricksPipeline np = subplans[0];
+        RecordDescriptor inputRecordDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op.getInputs().get(0).getValue()),
+                inputSchemas[0], context);
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[np.getOutputWidth()];
+        for (int i = 0; i < nullWriterFactories.length; i++) {
+            nullWriterFactories[i] = context.getNullWriterFactory();
+        }
+        SubplanRuntimeFactory runtime = new SubplanRuntimeFactory(np, nullWriterFactories, inputRecordDesc, null);
+
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+        builder.contributeMicroOperator(subplan, runtime, recDesc);
+
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
new file mode 100644
index 0000000..8a2ecf6
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class TokenizePOperator extends AbstractPhysicalOperator {
+
+    private final List<LogicalVariable> primaryKeys;
+    private final List<LogicalVariable> secondaryKeys;
+    private final IDataSourceIndex<?, ?> dataSourceIndex;
+
+    public TokenizePOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+            IDataSourceIndex<?, ?> dataSourceIndex) {
+        this.primaryKeys = primaryKeys;
+        this.secondaryKeys = secondaryKeys;
+        this.dataSourceIndex = dataSourceIndex;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.TOKENIZE;
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        TokenizeOperator tokenizeOp = (TokenizeOperator) op;
+        if (tokenizeOp.getOperation() != Kind.INSERT || !tokenizeOp.isBulkload()) {
+            throw new AlgebricksException("Tokenize Operator only works when bulk-loading data.");
+        }
+
+        IMetadataProvider mp = context.getMetadataProvider();
+        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+        JobSpecification spec = builder.getJobSpec();
+        RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+                context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getTokenizerRuntime(
+                dataSourceIndex, propagatedSchema, inputSchemas, typeEnv, primaryKeys, secondaryKeys, null, inputDesc,
+                context, spec, true);
+        builder.contributeHyracksOperator(tokenizeOp, runtimeAndConstraints.first);
+        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+        ILogicalOperator src = tokenizeOp.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, tokenizeOp, 0);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
new file mode 100644
index 0000000..cda3b00
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.union.UnionAllOperatorDescriptor;
+
+public class UnionAllPOperator extends AbstractPhysicalOperator {
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.UNION_ALL;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        IPartitioningProperty pp = op2.getDeliveredPhysicalProperties().getPartitioningProperty();
+        this.deliveredProperties = new StructuralPropertiesVector(pp, new ArrayList<ILocalStructuralProperty>(0));
+
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        StructuralPropertiesVector pv0 = StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
+        StructuralPropertiesVector pv1 = StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
+        return new PhysicalRequirements(new StructuralPropertiesVector[] { pv0, pv1 },
+                IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        UnionAllOperator unionOp = (UnionAllOperator) op;
+        int n = unionOp.getVariableMappings().size();
+        int[] leftColumns = new int[n];
+        int[] rightColumns = new int[n];
+        int i = 0;
+        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> t : unionOp.getVariableMappings()) {
+            int posLeft = inputSchemas[0].findVariable(t.first);
+            leftColumns[i] = posLeft;
+            int posRight = inputSchemas[1].findVariable(t.second);
+            rightColumns[i] = posRight;
+            ++i;
+        }
+
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+
+        // at algebricks level, union all only accepts two inputs, although at
+        // hyracks
+        // level, there is no restrictions
+        UnionAllOperatorDescriptor opDesc = new UnionAllOperatorDescriptor(spec, 2, recordDescriptor);
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+        ILogicalOperator src1 = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src1, 0, op, 0);
+        ILogicalOperator src2 = op.getInputs().get(1).getValue();
+        builder.contributeGraphEdge(src2, 0, op, 1);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
new file mode 100644
index 0000000..d60d4f7
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.UnnestRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class UnnestPOperator extends AbstractScanPOperator {
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.UNNEST;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        UnnestOperator unnest = (UnnestOperator) op;
+
+        int outCol = opSchema.findVariable(unnest.getVariable());
+        ILogicalExpression unnestExpr = unnest.getExpressionRef().getValue();
+        IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+        boolean exit = false;
+        if (unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+            exit = true;
+        } else {
+            AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) unnestExpr;
+            if (fce.getKind() != FunctionKind.UNNEST) {
+                exit = true;
+            }
+        }
+        if (exit) {
+            throw new AlgebricksException("Unnest expression " + unnestExpr + " is not an unnesting function call.");
+        }
+        UnnestingFunctionCallExpression agg = (UnnestingFunctionCallExpression) unnestExpr;
+        IUnnestingEvaluatorFactory unnestingFactory = expressionRuntimeProvider.createUnnestingFunctionFactory(agg,
+                context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+
+        // for position offset
+        ILogicalExpression posOffsetExpr = unnest.getPositionOffsetExpr();
+        IScalarEvaluatorFactory posOffsetExprEvalFactory = null;
+        if (posOffsetExpr != null) {
+            posOffsetExprEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(posOffsetExpr,
+                    context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+        }
+
+        int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
+        UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, unnestingFactory, projectionList,
+                unnest.getPositionWriter(), posOffsetExprEvalFactory);
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+        builder.contributeMicroOperator(unnest, unnestRuntime, recDesc);
+        ILogicalOperator src = unnest.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, unnest, 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
new file mode 100644
index 0000000..f399184
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class WriteResultPOperator extends AbstractPhysicalOperator {
+
+    private LogicalVariable payload;
+    private List<LogicalVariable> keys;
+    private IDataSource<?> dataSource;
+    private final List<LogicalVariable> additionalFilteringKeys;
+
+    public WriteResultPOperator(IDataSource<?> dataSource, LogicalVariable payload, List<LogicalVariable> keys,
+            List<LogicalVariable> additionalFilteringKeys) {
+        this.dataSource = dataSource;
+        this.payload = payload;
+        this.keys = keys;
+        this.additionalFilteringKeys = additionalFilteringKeys;
+    }
+
+    @Override
+    public String toString() {
+        return getOperatorTag().toString() + " " + payload + " " + keys;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.WRITE_RESULT;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
+        scanVariables.addAll(keys);
+        scanVariables.add(new LogicalVariable(-1));
+        IPhysicalPropertiesVector r = dataSource.getPropertiesProvider().computePropertiesVector(scanVariables);
+        IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
+        requirements[0] = r;
+        return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        WriteResultOperator writeResultOp = (WriteResultOperator) op;
+        IMetadataProvider mp = context.getMetadataProvider();
+
+        JobSpecification spec = builder.getJobSpec();
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getWriteResultRuntime(
+                dataSource, propagatedSchema, keys, payload, additionalFilteringKeys, context, spec);
+
+        builder.contributeHyracksOperator(writeResultOp, runtimeAndConstraints.first);
+        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+        ILogicalOperator src = writeResultOp.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, writeResultOp, 0);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/ALogicalPlanImpl.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/ALogicalPlanImpl.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/ALogicalPlanImpl.java
new file mode 100644
index 0000000..8048bf1
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/ALogicalPlanImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.plan;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+
+/*
+ * Author: Guangqiang Li
+ * Created on Jul 9, 2009 
+ */
+public class ALogicalPlanImpl implements ILogicalPlan {
+    private List<Mutable<ILogicalOperator>> roots;
+
+    public ALogicalPlanImpl() {
+        this.roots = new ArrayList<Mutable<ILogicalOperator>>();
+    }
+
+    public ALogicalPlanImpl(List<Mutable<ILogicalOperator>> roots) {
+        this.roots = roots;
+    }
+
+    public ALogicalPlanImpl(Mutable<ILogicalOperator> root) {
+        roots = new ArrayList<Mutable<ILogicalOperator>>(1);
+        roots.add(root);
+    }
+
+    public List<Mutable<ILogicalOperator>> getRoots() {
+        return roots;
+    }
+
+    public void setRoots(List<Mutable<ILogicalOperator>> roots) {
+        this.roots = roots;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalExpressionPrettyPrintVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalExpressionPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalExpressionPrettyPrintVisitor.java
new file mode 100644
index 0000000..5f77352
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalExpressionPrettyPrintVisitor.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2014 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
+
+
+public class LogicalExpressionPrettyPrintVisitor implements ILogicalExpressionVisitor<String, Integer> {
+
+	@Override
+	public String visitConstantExpression(ConstantExpression expr, Integer indent)
+			throws AlgebricksException {
+		return expr.toString();
+	}
+
+	@Override
+	public String visitVariableReferenceExpression(
+			VariableReferenceExpression expr, Integer indent)
+			throws AlgebricksException {
+		return expr.toString();
+	}
+
+	@Override
+	public String visitAggregateFunctionCallExpression(
+			AggregateFunctionCallExpression expr, Integer indent)
+			throws AlgebricksException {
+		return expr.toString();
+	}
+
+	@Override
+	public String visitScalarFunctionCallExpression(
+			ScalarFunctionCallExpression expr, Integer indent)
+			throws AlgebricksException {
+		return expr.toString();
+	}
+
+	@Override
+	public String visitStatefulFunctionCallExpression(
+			StatefulFunctionCallExpression expr, Integer indent)
+			throws AlgebricksException {
+		return expr.toString();
+	}
+
+	@Override
+	public String visitUnnestingFunctionCallExpression(
+			UnnestingFunctionCallExpression expr, Integer indent)
+			throws AlgebricksException {
+		return expr.toString();
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
new file mode 100644
index 0000000..2c924d5
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -0,0 +1,431 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class LogicalOperatorPrettyPrintVisitor implements ILogicalOperatorVisitor<String, Integer> {
+
+    ILogicalExpressionVisitor<String, Integer> exprVisitor;
+
+    public LogicalOperatorPrettyPrintVisitor() {
+        exprVisitor = new LogicalExpressionPrettyPrintVisitor();
+    }
+
+    public LogicalOperatorPrettyPrintVisitor(ILogicalExpressionVisitor<String, Integer> exprVisitor) {
+        this.exprVisitor = exprVisitor;
+    }
+
+    @Override
+    public String visitAggregateOperator(AggregateOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("aggregate ").append(op.getVariables()).append(" <- ");
+        pprintExprList(op.getExpressions(), buffer, indent);
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitRunningAggregateOperator(RunningAggregateOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("running-aggregate ").append(op.getVariables()).append(" <- ");
+        pprintExprList(op.getExpressions(), buffer, indent);
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Integer indent) {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("empty-tuple-source");
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitGroupByOperator(GroupByOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("group by (");
+        pprintVeList(buffer, op.getGroupByList(), indent);
+        buffer.append(") decor (");
+        pprintVeList(buffer, op.getDecorList(), indent);
+        buffer.append(") {");
+        printNestedPlans(op, indent, buffer);
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitDistinctOperator(DistinctOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("distinct " + "(");
+        pprintExprList(op.getExpressions(), buffer, indent);
+        buffer.append(")");
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitInnerJoinOperator(InnerJoinOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("join (").append(op.getCondition().getValue().accept(exprVisitor, indent))
+                .append(")");
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("left outer join (")
+                .append(op.getCondition().getValue().accept(exprVisitor, indent)).append(")");
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Integer indent) {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("nested tuple source");
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitOrderOperator(OrderOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("order ");
+        for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) {
+            String fst;
+            switch (p.first.getKind()) {
+                case ASC: {
+                    fst = "ASC";
+                    break;
+                }
+                case DESC: {
+                    fst = "DESC";
+                    break;
+                }
+                default: {
+                    fst = p.first.getExpressionRef().toString();
+                }
+            }
+            buffer.append("(" + fst + ", " + p.second.getValue().accept(exprVisitor, indent) + ") ");
+        }
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitAssignOperator(AssignOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("assign ").append(op.getVariables()).append(" <- ");
+        pprintExprList(op.getExpressions(), buffer, indent);
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitWriteOperator(WriteOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("write ");
+        pprintExprList(op.getExpressions(), buffer, indent);
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitDistributeResultOperator(DistributeResultOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("distribute result ");
+        pprintExprList(op.getExpressions(), buffer, indent);
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitWriteResultOperator(WriteResultOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("load ").append(op.getDataSource()).append(" from ")
+                .append(op.getPayloadExpression().getValue().accept(exprVisitor, indent)).append(" partitioned by ");
+        pprintExprList(op.getKeyExpressions(), buffer, indent);
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitSelectOperator(SelectOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("select (").append(op.getCondition().getValue().accept(exprVisitor, indent))
+                .append(")");
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitProjectOperator(ProjectOperator op, Integer indent) {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("project " + "(" + op.getVariables() + ")");
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitPartitioningSplitOperator(PartitioningSplitOperator op, Integer indent)
+            throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("partitioning-split (");
+        pprintExprList(op.getExpressions(), buffer, indent);
+        buffer.append(")");
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitSubplanOperator(SubplanOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("subplan {");
+        printNestedPlans(op, indent, buffer);
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitUnionOperator(UnionAllOperator op, Integer indent) {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("union");
+        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> v : op.getVariableMappings()) {
+            buffer.append(" (" + v.first + ", " + v.second + ", " + v.third + ")");
+        }
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitUnnestOperator(UnnestOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("unnest " + op.getVariable());
+        if (op.getPositionalVariable() != null) {
+            buffer.append(" at " + op.getPositionalVariable());
+        }
+        buffer.append(" <- " + op.getExpressionRef().getValue().accept(exprVisitor, indent));
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitUnnestMapOperator(UnnestMapOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append(
+                "unnest-map " + op.getVariables() + " <- "
+                        + op.getExpressionRef().getValue().accept(exprVisitor, indent));
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitDataScanOperator(DataSourceScanOperator op, Integer indent) {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append(
+                "data-scan " + op.getProjectVariables() + "<-" + op.getVariables() + " <- " + op.getDataSource());
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitLimitOperator(LimitOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("limit " + op.getMaxObjects().getValue().accept(exprVisitor, indent));
+        ILogicalExpression offset = op.getOffset().getValue();
+        if (offset != null) {
+            buffer.append(", " + offset.accept(exprVisitor, indent));
+        }
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitExchangeOperator(ExchangeOperator op, Integer indent) {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("exchange ");
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitScriptOperator(ScriptOperator op, Integer indent) {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append(
+                "script (in: " + op.getInputVariables() + ") (out: " + op.getOutputVariables() + ")");
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitReplicateOperator(ReplicateOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("replicate ");
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitMaterializeOperator(MaterializeOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("materialize ");
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitInsertDeleteOperator(InsertDeleteOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        String header = op.getOperation() == Kind.INSERT ? "insert into " : "delete from ";
+        addIndent(buffer, indent).append(header).append(op.getDataSource()).append(" from ")
+                .append(op.getPayloadExpression().getValue().accept(exprVisitor, indent)).append(" partitioned by ");
+        pprintExprList(op.getPrimaryKeyExpressions(), buffer, indent);
+        if (op.isBulkload()) {
+            buffer.append(" [bulkload]");
+        }
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Integer indent)
+            throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        String header = op.getOperation() == Kind.INSERT ? "insert into " : "delete from ";
+        addIndent(buffer, indent).append(header).append(op.getIndexName()).append(" on ")
+                .append(op.getDataSourceIndex().getDataSource()).append(" from ");
+        pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
+        if (op.isBulkload()) {
+            buffer.append(" [bulkload]");
+        }
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitTokenizeOperator(TokenizeOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("tokenize ").append(op.getTokenizeVars()).append(" <- ");
+        pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitSinkOperator(SinkOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("sink");
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitExtensionOperator(ExtensionOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append(op.toString());
+        return buffer.toString();
+    }
+
+    protected static final StringBuilder addIndent(StringBuilder buffer, int level) {
+        for (int i = 0; i < level; ++i) {
+            buffer.append(' ');
+        }
+        return buffer;
+    }
+
+    protected void printNestedPlans(AbstractOperatorWithNestedPlans op, Integer indent, StringBuilder buffer)
+            throws AlgebricksException {
+        boolean first = true;
+        if (op.getNestedPlans().isEmpty()) {
+            buffer.append("}");
+        } else {
+            for (ILogicalPlan p : op.getNestedPlans()) {
+                // PrettyPrintUtil.indent(buffer, level + 10).append("var " +
+                // p.first + ":\n");
+                buffer.append("\n");
+                if (first) {
+                    first = false;
+                } else {
+                    addIndent(buffer, indent).append("       {\n");
+                }
+                PlanPrettyPrinter.printPlan(p, buffer, this, indent + 10);
+                addIndent(buffer, indent).append("       }");
+            }
+        }
+    }
+
+    protected void pprintExprList(List<Mutable<ILogicalExpression>> expressions, StringBuilder buffer, Integer indent)
+            throws AlgebricksException {
+        buffer.append("[");
+        boolean first = true;
+        for (Mutable<ILogicalExpression> exprRef : expressions) {
+            if (first) {
+                first = false;
+            } else {
+                buffer.append(", ");
+            }
+            buffer.append(exprRef.getValue().accept(exprVisitor, indent));
+        }
+        buffer.append("]");
+    }
+
+    protected void pprintVeList(StringBuilder sb, List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> vePairList,
+            Integer indent) throws AlgebricksException {
+        sb.append("[");
+        boolean fst = true;
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : vePairList) {
+            if (fst) {
+                fst = false;
+            } else {
+                sb.append("; ");
+            }
+            if (ve.first != null) {
+                sb.append(ve.first + " := " + ve.second);
+            } else {
+                sb.append(ve.second.getValue().accept(exprVisitor, indent));
+            }
+        }
+        sb.append("]");
+    }
+
+    @Override
+    public String visitExternalDataLookupOperator(ExternalDataLookupOperator op, Integer indent)
+            throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append(
+                "external-instant-lookup " + op.getVariables() + " <- " + op.getExpressionRef().getValue());
+        return buffer.toString();
+    }
+
+}


Mime
View raw message