asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [2/3] incubator-asterixdb-hyracks git commit: Add Support for Upsert Operation
Date Mon, 01 Feb 2016 08:29:40 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
new file mode 100644
index 0000000..4702361
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class IndexInsertDeleteUpsertPOperator extends AbstractPhysicalOperator {
+
+    private final List<LogicalVariable> primaryKeys;
+    private final List<LogicalVariable> secondaryKeys;
+    private final ILogicalExpression filterExpr;
+    private final IDataSourceIndex<?, ?> dataSourceIndex;
+    private final List<LogicalVariable> additionalFilteringKeys;
+    private final List<LogicalVariable> prevSecondaryKeys;
+    private final LogicalVariable prevAdditionalFilteringKey;
+
+    public IndexInsertDeleteUpsertPOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+            List<LogicalVariable> additionalFilteringKeys, Mutable<ILogicalExpression> filterExpr,
+            IDataSourceIndex<?, ?> dataSourceIndex, List<LogicalVariable> prevSecondaryKeys,
+            LogicalVariable prevAdditionalFilteringKey) {
+        this.primaryKeys = primaryKeys;
+        this.secondaryKeys = secondaryKeys;
+        if (filterExpr != null) {
+            this.filterExpr = filterExpr.getValue();
+        } else {
+            this.filterExpr = null;
+        }
+        this.dataSourceIndex = dataSourceIndex;
+        this.additionalFilteringKeys = additionalFilteringKeys;
+        this.prevSecondaryKeys = prevSecondaryKeys;
+        this.prevAdditionalFilteringKey = prevAdditionalFilteringKey;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.INDEX_INSERT_DELETE;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
+        scanVariables.addAll(primaryKeys);
+        scanVariables.add(new LogicalVariable(-1));
+        IPhysicalPropertiesVector r = dataSourceIndex.getDataSource().getPropertiesProvider()
+                .computePropertiesVector(scanVariables);
+        r.getLocalProperties().clear();
+        IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
+        requirements[0] = r;
+        return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+                    throws AlgebricksException {
+        IndexInsertDeleteUpsertOperator insertDeleteUpsertOp = (IndexInsertDeleteUpsertOperator) op;
+        IMetadataProvider mp = context.getMetadataProvider();
+
+        JobSpecification spec = builder.getJobSpec();
+        RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+                context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
+        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(insertDeleteUpsertOp);
+        if (insertDeleteUpsertOp.getOperation() == Kind.INSERT) {
+            runtimeAndConstraints = mp.getIndexInsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
+                    primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec, false);
+        } else if (insertDeleteUpsertOp.getOperation() == Kind.DELETE) {
+            runtimeAndConstraints = mp.getIndexDeleteRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
+                    primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec);
+        } else if (insertDeleteUpsertOp.getOperation() == Kind.UPSERT) {
+            runtimeAndConstraints = mp.getIndexUpsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
+                    primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, prevSecondaryKeys,
+                    prevAdditionalFilteringKey, inputDesc, context, spec);
+        }
+        builder.contributeHyracksOperator(insertDeleteUpsertOp, runtimeAndConstraints.first);
+        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+        ILogicalOperator src = insertDeleteUpsertOp.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, insertDeleteUpsertOp, 0);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+
+    public List<LogicalVariable> getPrevSecondaryKeys() {
+        return prevSecondaryKeys;
+    }
+
+    public LogicalVariable getPrevFilteringKeys() {
+        return prevAdditionalFilteringKey;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
deleted file mode 100644
index c774f44..0000000
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.physical;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.JobSpecification;
-
-@SuppressWarnings("rawtypes")
-public class InsertDeletePOperator extends AbstractPhysicalOperator {
-
-    private LogicalVariable payload;
-    private List<LogicalVariable> keys;
-    private IDataSource<?> dataSource;
-    private final List<LogicalVariable> additionalFilteringKeys;
-
-    public InsertDeletePOperator(LogicalVariable payload, List<LogicalVariable> keys,
-            List<LogicalVariable> additionalFilteringKeys, IDataSource dataSource) {
-        this.payload = payload;
-        this.keys = keys;
-        this.dataSource = dataSource;
-        this.additionalFilteringKeys = additionalFilteringKeys;
-    }
-
-    @Override
-    public PhysicalOperatorTag getOperatorTag() {
-        return PhysicalOperatorTag.INSERT_DELETE;
-    }
-
-    @Override
-    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
-    }
-
-    @Override
-    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
-        List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
-        scanVariables.addAll(keys);
-        scanVariables.add(new LogicalVariable(-1));
-        IPhysicalPropertiesVector r = dataSource.getPropertiesProvider().computePropertiesVector(scanVariables);
-        r.getLocalProperties().clear();
-        IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
-        requirements[0] = r;
-        return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
-            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
-        InsertDeleteOperator insertDeleteOp = (InsertDeleteOperator) op;
-        IMetadataProvider mp = context.getMetadataProvider();
-        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
-        JobSpecification spec = builder.getJobSpec();
-        RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
-                context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
-
-        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
-        if (insertDeleteOp.getOperation() == Kind.INSERT) {
-            runtimeAndConstraints = mp.getInsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
-                    additionalFilteringKeys, inputDesc, context, spec, false);
-        } else {
-            runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
-                    additionalFilteringKeys, inputDesc, context, spec);
-        }
-
-        builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first);
-        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
-        ILogicalOperator src = insertDeleteOp.getInputs().get(0).getValue();
-        builder.contributeGraphEdge(src, 0, insertDeleteOp, 0);
-    }
-
-    @Override
-    public boolean isMicroOperator() {
-        return false;
-    }
-
-    @Override
-    public boolean expensiveThanMaterialization() {
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
new file mode 100644
index 0000000..d844f37
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+
+@SuppressWarnings("rawtypes")
+public class InsertDeleteUpsertPOperator extends AbstractPhysicalOperator {
+
+    private LogicalVariable payload;
+    private List<LogicalVariable> keys;
+    private IDataSource<?> dataSource;
+    private final List<LogicalVariable> additionalFilteringKeys;
+    private final Kind operation;
+    private final LogicalVariable prevPayload;
+
+    public InsertDeleteUpsertPOperator(LogicalVariable payload, List<LogicalVariable> keys,
+            List<LogicalVariable> additionalFilteringKeys, IDataSource dataSource, Kind operation,
+            LogicalVariable prevPayload) {
+        this.payload = payload;
+        this.keys = keys;
+        this.dataSource = dataSource;
+        this.additionalFilteringKeys = additionalFilteringKeys;
+        this.operation = operation;
+        this.prevPayload = prevPayload;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.INSERT_DELETE;
+    }
+
+    // Delivered Properties of this will be (Sorted on PK, Partitioned on PK)
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
+        scanVariables.addAll(keys);
+        scanVariables.add(new LogicalVariable(-1));
+        IPhysicalPropertiesVector r = dataSource.getPropertiesProvider().computePropertiesVector(scanVariables);
+        r.getLocalProperties().clear();
+        IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
+        requirements[0] = r;
+        return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+                    throws AlgebricksException {
+        InsertDeleteUpsertOperator insertDeleteOp = (InsertDeleteUpsertOperator) op;
+        IMetadataProvider mp = context.getMetadataProvider();
+        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+        JobSpecification spec = builder.getJobSpec();
+        RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+                context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
+        if (operation == Kind.INSERT) {
+            runtimeAndConstraints = mp.getInsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
+                    additionalFilteringKeys, inputDesc, context, spec, false);
+        } else if (operation == Kind.DELETE) {
+            runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
+                    additionalFilteringKeys, inputDesc, context, spec);
+        } else if (operation == Kind.UPSERT) {
+            runtimeAndConstraints = mp.getUpsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
+                    additionalFilteringKeys, prevPayload, inputDesc, context, spec);
+        } else {
+            throw new AlgebricksException("Unsupported Operation " + operation);
+        }
+
+        builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first);
+        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+        ILogicalOperator src = insertDeleteOp.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, insertDeleteOp, 0);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
index e1983d2..98427fe 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
@@ -35,7 +35,7 @@ import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 17f2285..cf0f1c2 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -38,10 +38,10 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -325,12 +325,16 @@ public class LogicalOperatorPrettyPrintVisitor implements ILogicalOperatorVisito
     }
 
     @Override
-    public String visitInsertDeleteOperator(InsertDeleteOperator op, Integer indent) throws AlgebricksException {
+    public String visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        String header = op.getOperation() == Kind.INSERT ? "insert into " : "delete from ";
+        String header = getIndexOpString(op.getOperation());
         addIndent(buffer, indent).append(header).append(op.getDataSource()).append(" from ")
                 .append(op.getPayloadExpression().getValue().accept(exprVisitor, indent)).append(" partitioned by ");
         pprintExprList(op.getPrimaryKeyExpressions(), buffer, indent);
+        if (op.getOperation() == Kind.UPSERT) {
+            buffer.append(" out: ([" + op.getPrevRecordVar() + "] <-{record-before-upsert}) ");
+        }
         if (op.isBulkload()) {
             buffer.append(" [bulkload]");
         }
@@ -338,19 +342,38 @@ public class LogicalOperatorPrettyPrintVisitor implements ILogicalOperatorVisito
     }
 
     @Override
-    public String visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Integer indent)
+    public String visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Integer indent)
             throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        String header = op.getOperation() == Kind.INSERT ? "insert into " : "delete from ";
+        String header = getIndexOpString(op.getOperation());
         addIndent(buffer, indent).append(header).append(op.getIndexName()).append(" on ")
                 .append(op.getDataSourceIndex().getDataSource()).append(" from ");
-        pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
+        if (op.getOperation() == Kind.UPSERT) {
+            buffer.append(" replace:");
+            pprintExprList(op.getPrevSecondaryKeyExprs(), buffer, indent);
+            buffer.append(" with:");
+            pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
+        } else {
+            pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
+        }
         if (op.isBulkload()) {
             buffer.append(" [bulkload]");
         }
         return buffer.toString();
     }
 
+    public String getIndexOpString(Kind opKind) {
+        switch (opKind) {
+            case DELETE:
+                return "delete from ";
+            case INSERT:
+                return "insert into ";
+            case UPSERT:
+                return "upsert into ";
+        }
+        return null;
+    }
+
     @Override
     public String visitTokenizeOperator(TokenizeOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 285812c..53c8b69 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -29,9 +29,9 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -113,9 +113,9 @@ public interface ILogicalOperatorVisitor<R, T> {
 
     public R visitWriteResultOperator(WriteResultOperator op, T arg) throws AlgebricksException;
 
-    public R visitInsertDeleteOperator(InsertDeleteOperator op, T tag) throws AlgebricksException;
+    public R visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, T tag) throws AlgebricksException;
 
-    public R visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, T tag) throws AlgebricksException;
+    public R visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, T tag) throws AlgebricksException;
 
     public R visitExternalDataLookupOperator(ExternalDataLookupOperator op, T arg) throws AlgebricksException;
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java
index b5ce212..73628c1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java
@@ -19,8 +19,8 @@
 package org.apache.hyracks.algebricks.core.algebra.visitors;
 
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
@@ -43,12 +43,12 @@ public interface IQueryOperatorVisitor<R, T> extends ILogicalOperatorVisitor<R,
     }
 
     @Override
-    public default R visitInsertDeleteOperator(InsertDeleteOperator op, T arg) {
+    public default R visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, T arg) {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public default R visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, T arg) {
+    public default R visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, T arg) {
         throw new UnsupportedOperationException();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index 8f9ab9f..8bf3dbb 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -84,7 +84,7 @@ public class PigletMetadataProvider implements IMetadataProvider<String, String>
             List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
             List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
             IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         PigletFileDataSource ds = (PigletFileDataSource) dataSource;
 
         FileSplit[] fileSplits = ds.getFileSplits();
@@ -141,15 +141,15 @@ public class PigletMetadataProvider implements IMetadataProvider<String, String>
     @Override
     public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
             int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         PigletFileDataSink ds = (PigletFileDataSink) sink;
         FileSplit[] fileSplits = ds.getFileSplits();
         String[] locations = new String[fileSplits.length];
         for (int i = 0; i < fileSplits.length; ++i) {
             locations[i] = fileSplits[i].getNodeName();
         }
-        IPushRuntimeFactory prf = new SinkWriterRuntimeFactory(printColumns, printerFactories, fileSplits[0]
-                .getLocalFile().getFile(), PrinterBasedWriterFactory.INSTANCE, inputDesc);
+        IPushRuntimeFactory prf = new SinkWriterRuntimeFactory(printColumns, printerFactories,
+                fileSplits[0].getLocalFile().getFile(), PrinterBasedWriterFactory.INSTANCE, inputDesc);
         AlgebricksAbsolutePartitionConstraint constraint = new AlgebricksAbsolutePartitionConstraint(locations);
         return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(prf, constraint);
     }
@@ -178,15 +178,11 @@ public class PigletMetadataProvider implements IMetadataProvider<String, String>
 
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
-            IDataSourceIndex<String, String> dataSource,
-            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
-            IVariableTypeEnvironment typeEnv,
-            List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> secondaryKeys,
-            List<LogicalVariable> additionalNonKeyFields,
-            ILogicalExpression filterExpr,
-            RecordDescriptor recordDesc, JobGenContext context,
-            JobSpecification spec, boolean bulkload) throws AlgebricksException {
+            IDataSourceIndex<String, String> dataSource, IOperatorSchema propagatedSchema,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
+            ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
+            boolean bulkload) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
     }
@@ -197,16 +193,16 @@ public class PigletMetadataProvider implements IMetadataProvider<String, String>
             IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
             List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
             ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
     }
 
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getTokenizerRuntime(
-            IDataSourceIndex<String, String> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
-            IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
-            ILogicalExpression filterExpr, RecordDescriptor recordDesc,
+            IDataSourceIndex<String, String> dataSource, IOperatorSchema propagatedSchema,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
             JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
@@ -235,4 +231,24 @@ public class PigletMetadataProvider implements IMetadataProvider<String, String>
         return null;
     }
 
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<String> dataSource,
+            IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
+            LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields, LogicalVariable prevPayload,
+            RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
+            IDataSourceIndex<String, String> dataSourceIndex, IOperatorSchema propagatedSchema,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys,
+            ILogicalExpression filterExpr, List<LogicalVariable> prevSecondaryKeys,
+            LogicalVariable prevAdditionalFilteringKeys, RecordDescriptor inputDesc, JobGenContext context,
+            JobSpecification spec) throws AlgebricksException {
+        // TODO Auto-generated method stub
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 394524b..d85ffe9 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -23,7 +23,6 @@ import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -45,9 +44,10 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOpe
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
@@ -62,8 +62,8 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleS
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexBulkloadPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeletePOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.InsertDeletePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeleteUpsertPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.InsertDeleteUpsertPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreclusteredGroupByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedTupleSourcePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
@@ -95,7 +95,8 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
     }
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
         // if (context.checkIfInDontApplySet(this, op)) {
         // return false;
@@ -116,7 +117,7 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
         }
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, boolean topLevelOp,
             IOptimizationContext context) throws AlgebricksException {
         PhysicalOptimizationConfig physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
@@ -152,8 +153,8 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
                     if (gby.getNestedPlans().size() == 1) {
                         ILogicalPlan p0 = gby.getNestedPlans().get(0);
                         if (p0.getRoots().size() == 1) {
-                            if (gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE
-                                    || gby.getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE) {
+                            if (gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE || gby
+                                    .getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE) {
                                 if (!topLevelOp) {
                                     throw new NotImplementedException(
                                             "External hash group-by for nested grouping is not implemented.");
@@ -213,8 +214,8 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
                         }
                     }
                     if (topLevelOp) {
-                        op.setPhysicalOperator(new StableSortPOperator(physicalOptimizationConfig
-                                .getMaxFramesExternalSort()));
+                        op.setPhysicalOperator(
+                                new StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort()));
                     } else {
                         op.setPhysicalOperator(new InMemoryStableSortPOperator());
                     }
@@ -282,12 +283,13 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
                         additionalFilteringKeys = new ArrayList<LogicalVariable>();
                         getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
                     }
-                    op.setPhysicalOperator(new WriteResultPOperator(opLoad.getDataSource(), payload, keys,
-                            additionalFilteringKeys));
+                    op.setPhysicalOperator(
+                            new WriteResultPOperator(opLoad.getDataSource(), payload, keys, additionalFilteringKeys));
                     break;
                 }
-                case INSERT_DELETE: {
-                    InsertDeleteOperator opLoad = (InsertDeleteOperator) op;
+                case INSERT_DELETE_UPSERT: {
+                    // Primary index
+                    InsertDeleteUpsertOperator opLoad = (InsertDeleteUpsertOperator) op;
                     LogicalVariable payload;
                     List<LogicalVariable> keys = new ArrayList<LogicalVariable>();
                     List<LogicalVariable> additionalFilteringKeys = null;
@@ -297,16 +299,17 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
                         getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
                     }
                     if (opLoad.isBulkload()) {
-                        op.setPhysicalOperator(new BulkloadPOperator(payload, keys, additionalFilteringKeys, opLoad
-                                .getDataSource()));
+                        op.setPhysicalOperator(
+                                new BulkloadPOperator(payload, keys, additionalFilteringKeys, opLoad.getDataSource()));
                     } else {
-                        op.setPhysicalOperator(new InsertDeletePOperator(payload, keys, additionalFilteringKeys, opLoad
-                                .getDataSource()));
+                        op.setPhysicalOperator(new InsertDeleteUpsertPOperator(payload, keys, additionalFilteringKeys,
+                                opLoad.getDataSource(), opLoad.getOperation(), opLoad.getPrevRecordVar()));
                     }
                     break;
                 }
-                case INDEX_INSERT_DELETE: {
-                    IndexInsertDeleteOperator opInsDel = (IndexInsertDeleteOperator) op;
+                case INDEX_INSERT_DELETE_UPSERT: {
+                    // Secondary index
+                    IndexInsertDeleteUpsertOperator opInsDel = (IndexInsertDeleteUpsertOperator) op;
                     List<LogicalVariable> primaryKeys = new ArrayList<LogicalVariable>();
                     List<LogicalVariable> secondaryKeys = new ArrayList<LogicalVariable>();
                     List<LogicalVariable> additionalFilteringKeys = null;
@@ -317,13 +320,24 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
                         getKeys(opInsDel.getAdditionalFilteringExpressions(), additionalFilteringKeys);
                     }
                     if (opInsDel.isBulkload()) {
-                        op.setPhysicalOperator(new IndexBulkloadPOperator(primaryKeys, secondaryKeys,
-                                additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
+                        op.setPhysicalOperator(
+                                new IndexBulkloadPOperator(primaryKeys, secondaryKeys, additionalFilteringKeys,
+                                        opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
                     } else {
-                        op.setPhysicalOperator(new IndexInsertDeletePOperator(primaryKeys, secondaryKeys,
-                                additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
+                        List<LogicalVariable> prevSecondaryKeys = null;
+                        LogicalVariable prevAdditionalFilteringKey = null;
+                        if (opInsDel.getOperation() == Kind.UPSERT) {
+                            prevSecondaryKeys = new ArrayList<LogicalVariable>();
+                            getKeys(opInsDel.getPrevSecondaryKeyExprs(), prevSecondaryKeys);
+                            if (opInsDel.getPrevAdditionalFilteringExpression() != null) {
+                                prevAdditionalFilteringKey = ((VariableReferenceExpression) (opInsDel
+                                        .getPrevAdditionalFilteringExpression()).getValue()).getVariableReference();
+                            }
+                        }
+                        op.setPhysicalOperator(new IndexInsertDeleteUpsertPOperator(primaryKeys, secondaryKeys,
+                                additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex(),
+                                prevSecondaryKeys, prevAdditionalFilteringKey));
                     }
-
                     break;
 
                 }
@@ -335,8 +349,8 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
                     getKeys(opTokenize.getSecondaryKeyExpressions(), secondaryKeys);
                     // Tokenize Operator only operates with a bulk load on a data set with an index
                     if (opTokenize.isBulkload()) {
-                        op.setPhysicalOperator(new TokenizePOperator(primaryKeys, secondaryKeys, opTokenize
-                                .getDataSourceIndex()));
+                        op.setPhysicalOperator(
+                                new TokenizePOperator(primaryKeys, secondaryKeys, opTokenize.getDataSourceIndex()));
                     }
                     break;
                 }
@@ -415,8 +429,8 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
         int n = aggOp.getExpressions().size();
         List<Mutable<ILogicalExpression>> mergeExpressionRefs = new ArrayList<Mutable<ILogicalExpression>>();
         for (int i = 0; i < n; i++) {
-            ILogicalExpression mergeExpr = mergeAggregationExpressionFactory.createMergeAggregation(
-                    originalAggVars.get(i), aggFuncRefs.get(i).getValue(), context);
+            ILogicalExpression mergeExpr = mergeAggregationExpressionFactory
+                    .createMergeAggregation(originalAggVars.get(i), aggFuncRefs.get(i).getValue(), context);
             if (mergeExpr == null) {
                 return false;
             }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
index a89e895..70d4f80 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
@@ -20,6 +20,7 @@ package org.apache.hyracks.dataflow.common.comm.io;
 
 import java.io.DataInputStream;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 
 import org.apache.hyracks.api.comm.FrameConstants;
 import org.apache.hyracks.api.comm.FrameHelper;
@@ -72,9 +73,8 @@ public class FrameTupleAccessor implements IFrameTupleAccessor {
 
     @Override
     public int getTupleStartOffset(int tupleIndex) {
-        int offset = tupleIndex == 0 ?
-                FrameConstants.TUPLE_START_OFFSET :
-                IntSerDeUtils.getInt(buffer.array(), tupleCountOffset - 4 * tupleIndex);
+        int offset = tupleIndex == 0 ? FrameConstants.TUPLE_START_OFFSET
+                : IntSerDeUtils.getInt(buffer.array(), tupleCountOffset - 4 * tupleIndex);
         return start + offset;
     }
 
@@ -85,16 +85,15 @@ public class FrameTupleAccessor implements IFrameTupleAccessor {
 
     @Override
     public int getTupleEndOffset(int tupleIndex) {
-        return start + IntSerDeUtils
-                .getInt(buffer.array(), tupleCountOffset - FrameConstants.SIZE_LEN * (tupleIndex + 1));
+        return start
+                + IntSerDeUtils.getInt(buffer.array(), tupleCountOffset - FrameConstants.SIZE_LEN * (tupleIndex + 1));
     }
 
     @Override
     public int getFieldStartOffset(int tupleIndex, int fIdx) {
-        return fIdx == 0 ?
-                0 :
-                IntSerDeUtils
-                        .getInt(buffer.array(), getTupleStartOffset(tupleIndex) + (fIdx - 1) * FrameConstants.SIZE_LEN);
+        return fIdx == 0 ? 0
+                : IntSerDeUtils.getInt(buffer.array(),
+                        getTupleStartOffset(tupleIndex) + (fIdx - 1) * FrameConstants.SIZE_LEN);
     }
 
     @Override
@@ -161,4 +160,44 @@ public class FrameTupleAccessor implements IFrameTupleAccessor {
     public int getFieldCount() {
         return recordDescriptor.getFieldCount();
     }
+
+    /*
+     * The two methods below can be used for debugging.
+     * They are safe as they don't print records. Printing records
+     * using IserializerDeserializer can print incorrect results or throw exceptions.
+     * A better way yet would be to use record pointable.
+     */
+    public void prettyPrint(String prefix, int[] recordFields) {
+        ByteBufferInputStream bbis = new ByteBufferInputStream();
+        DataInputStream dis = new DataInputStream(bbis);
+        int tc = getTupleCount();
+        StringBuilder sb = new StringBuilder();
+        sb.append(prefix).append("TC: " + tc).append("\n");
+        for (int i = 0; i < tc; ++i) {
+            prettyPrint(i, bbis, dis, sb, recordFields);
+        }
+        System.err.println(sb.toString());
+    }
+
+    protected void prettyPrint(int tid, ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb,
+            int[] recordFields) {
+        Arrays.sort(recordFields);
+        sb.append(" tid" + tid + ":(" + getTupleStartOffset(tid) + ", " + getTupleEndOffset(tid) + ")[");
+        for (int j = 0; j < getFieldCount(); ++j) {
+            sb.append("f" + j + ":(" + getFieldStartOffset(tid, j) + ", " + getFieldEndOffset(tid, j) + ") ");
+            sb.append("{");
+            bbis.setByteBuffer(buffer, getTupleStartOffset(tid) + getFieldSlotsLength() + getFieldStartOffset(tid, j));
+            try {
+                if (Arrays.binarySearch(recordFields, j) >= 0) {
+                    sb.append("a record field: only print using pointable");
+                } else {
+                    sb.append(recordDescriptor.getFields()[j].deserialize(dis));
+                }
+            } catch (HyracksDataException e) {
+                e.printStackTrace();
+            }
+            sb.append("}");
+        }
+        sb.append("\n");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/FrameTupleReference.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/FrameTupleReference.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/FrameTupleReference.java
index abf9c37..15ebe52 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/FrameTupleReference.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/FrameTupleReference.java
@@ -21,9 +21,9 @@ package org.apache.hyracks.dataflow.common.data.accessors;
 
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 
-public final class FrameTupleReference implements IFrameTupleReference {
-    private IFrameTupleAccessor fta;
-    private int tIndex;
+public class FrameTupleReference implements IFrameTupleReference {
+    protected IFrameTupleAccessor fta;
+    protected int tIndex;
 
     public void reset(IFrameTupleAccessor fta, int tIndex) {
         this.fta = fta;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/RangePredicate.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/RangePredicate.java b/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/RangePredicate.java
index 0399ede..f1a411b 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/RangePredicate.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/RangePredicate.java
@@ -70,10 +70,12 @@ public class RangePredicate extends AbstractSearchPredicate {
         this.highKeyCmp = highKeyCmp;
     }
 
+    @Override
     public MultiComparator getLowKeyComparator() {
         return lowKeyCmp;
     }
 
+    @Override
     public MultiComparator getHighKeyComparator() {
         return highKeyCmp;
     }
@@ -86,6 +88,7 @@ public class RangePredicate extends AbstractSearchPredicate {
         this.highKeyCmp = highKeyCmp;
     }
 
+    @Override
     public ITupleReference getLowKey() {
         return lowKey;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
index 5aac5a6..7ec4a30 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
@@ -22,33 +22,42 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 
 /**
- * This operation callback allows for arbitrary actions to be taken while traversing 
- * an index structure. The {@link IModificationOperationCallback} will be called on 
+ * This operation callback allows for arbitrary actions to be taken while traversing
+ * an index structure. The {@link IModificationOperationCallback} will be called on
  * all modifying operations (e.g. insert, update, delete...) for all indexes.
- * 
  * @author zheilbron
  */
 public interface IModificationOperationCallback {
+    public enum Operation {
+        INSERT,
+        DELETE
+    }
 
     /**
-     * This method is called on a tuple that is about to traverse an index's structure 
-     * (i.e. before any pages are pinned or latched). 
-     *
+     * This method is called on a tuple that is about to traverse an index's structure
+     * (i.e. before any pages are pinned or latched).
      * The format of the tuple is the format that would be stored in the index itself.
-     * 
-     * @param tuple the tuple that is about to be operated on
+     * @param tuple
+     *            the tuple that is about to be operated on
      */
     public void before(ITupleReference tuple) throws HyracksDataException;
 
     /**
-     * This method is called on a tuple when a tuple with a matching key is found for the 
-     * current operation. It is possible that tuple is null, in which case no tuple with a 
+     * This method is called on a tuple when a tuple with a matching key is found for the
+     * current operation. It is possible that tuple is null, in which case no tuple with a
      * matching key was found.
-     * 
-     * When found is called, the leaf page where the tuple resides will be pinned and latched, 
+     * When found is called, the leaf page where the tuple resides will be pinned and latched,
      * so blocking operations should be avoided.
-     * 
-     * @param tuple a tuple with a matching key, otherwise null if none exists
+     * @param tuple
+     *            a tuple with a matching key, otherwise null if none exists
      */
     public void found(ITupleReference before, ITupleReference after) throws HyracksDataException;
+
+    /**
+     * This call specifies the next operation to be performed. It is used to allow
+     * a single operator to perform different operations per tuple
+     * @param op
+     * @throws HyracksDataException
+     */
+    public void setOp(Operation op) throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchOperationCallback.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchOperationCallback.java
index 08ad1d0..9b1cd47 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchOperationCallback.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchOperationCallback.java
@@ -22,50 +22,56 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 
 /**
- * This operation callback allows for arbitrary actions to be taken while traversing 
- * an index structure. The {@link ISearchOperationCallback} will be called on 
+ * This operation callback allows for arbitrary actions to be taken while traversing
+ * an index structure. The {@link ISearchOperationCallback} will be called on
  * all search operations for ordered indexes only.
- * 
  * @author zheilbron
  */
 public interface ISearchOperationCallback {
 
     /**
-     * During an index search operation, this method will be called on tuples as they are 
-     * passed by with a search cursor. This call will be invoked while a leaf page is latched 
-     * and pinned. If the call returns false, then the page will be unlatched and unpinned and 
-     * {@link #reconcile(ITupleReference)} will be called with the tuple that was not proceeded 
+     * After the harness enters the operation components and before an index search operation starts,
+     * this method will be called on the search key.
+     * @param tuple
+     *            the tuple containing the search key (expected to be a point search key)
+     */
+    public void before(ITupleReference tuple) throws HyracksDataException;
+
+    /**
+     * During an index search operation, this method will be called on tuples as they are
+     * passed by with a search cursor. This call will be invoked while a leaf page is latched
+     * and pinned. If the call returns false, then the page will be unlatched and unpinned and
+     * {@link #reconcile(ITupleReference)} will be called with the tuple that was not proceeded
      * on.
-     * 
-     * @param tuple the tuple that is being passed over by the search cursor
+     * @param tuple
+     *            the tuple that is being passed over by the search cursor
      * @return true to proceed otherwise false to unlatch and unpin, leading to reconciliation
      */
     public boolean proceed(ITupleReference tuple) throws HyracksDataException;
 
     /**
-     * This method is only called on a tuple that was not 'proceeded' on 
-     * (see {@link #proceed(ITupleReference)}). This method allows an opportunity to reconcile 
-     * by performing any necessary actions before resuming the search (e.g. a try-lock may have 
+     * This method is only called on a tuple that was not 'proceeded' on
+     * (see {@link #proceed(ITupleReference)}). This method allows an opportunity to reconcile
+     * by performing any necessary actions before resuming the search (e.g. a try-lock may have
      * failed in the proceed call, and now in reconcile we should take a full (blocking) lock).
-     * 
-     * @param tuple the tuple that failed to proceed
+     * @param tuple
+     *            the tuple that failed to proceed
      */
     public void reconcile(ITupleReference tuple) throws HyracksDataException;
 
     /**
-     * This method is only called on a tuple that was reconciled on, but not found after 
-     * retraversing. This method allows an opportunity to cancel some action that was taken in 
+     * This method is only called on a tuple that was reconciled on, but not found after
+     * retraversing. This method allows an opportunity to cancel some action that was taken in
      * {@link #reconcile(ITupleReference))}.
-     * 
-     * @param tuple the tuple that was previously reconciled
+     * @param tuple
+     *            the tuple that was previously reconciled
      */
     public void cancel(ITupleReference tuple) throws HyracksDataException;
-    
+
     /**
      * This method is only called on a tuple that was reconciled on, and found after
-     * retraversing. This method allows an opportunity to do some subsequent action that was 
+     * retraversing. This method allows an opportunity to do some subsequent action that was
      * taken in {@link #reconcile(ITupleReference))}.
-     * 
      * @param tuple
      *            the tuple that was previously reconciled
      */

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchPredicate.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchPredicate.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchPredicate.java
index 94a6a27..1c0d143 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchPredicate.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchPredicate.java
@@ -21,10 +21,20 @@ package org.apache.hyracks.storage.am.common.api;
 
 import java.io.Serializable;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
 
 public interface ISearchPredicate extends Serializable {
-	public MultiComparator getLowKeyComparator();
+    public MultiComparator getLowKeyComparator();
 
-	public MultiComparator getHighKeyComparator();
+    public MultiComparator getHighKeyComparator();
+
+    /**
+     * Get the search key to be used with point search operation on primary index.
+     * This method will only be called with point search predicates that only happen in primary index.
+     * @return
+     * @throws HyracksDataException
+     */
+    public ITupleReference getLowKey();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
index b632ba9..e8ab8dc 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
@@ -27,7 +27,7 @@ import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
 /**
  * Dummy operation callback that simply does nothing.
  */
-public enum NoOpOperationCallback implements IModificationOperationCallback, ISearchOperationCallback {
+public enum NoOpOperationCallback implements IModificationOperationCallback,ISearchOperationCallback {
     INSTANCE;
 
     @Override
@@ -42,12 +42,12 @@ public enum NoOpOperationCallback implements IModificationOperationCallback, ISe
 
     @Override
     public void before(ITupleReference tuple) {
-        // Do nothing.        
+        // Do nothing.
     }
 
     @Override
     public void found(ITupleReference before, ITupleReference after) {
-        // Do nothing.        
+        // Do nothing.
     }
 
     @Override
@@ -59,4 +59,9 @@ public enum NoOpOperationCallback implements IModificationOperationCallback, ISe
     public void complete(ITupleReference tuple) throws HyracksDataException {
         // Do nothing.
     }
+
+    @Override
+    public void setOp(Operation op) throws HyracksDataException {
+        // Do nothing.
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/PermutingFrameTupleReference.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/PermutingFrameTupleReference.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/PermutingFrameTupleReference.java
index 5f27835..b9792c2 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/PermutingFrameTupleReference.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/PermutingFrameTupleReference.java
@@ -19,51 +19,40 @@
 
 package org.apache.hyracks.storage.am.common.tuples;
 
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public class PermutingFrameTupleReference implements IFrameTupleReference {
-	private IFrameTupleAccessor fta;
-	private int tIndex;
-	private int[] fieldPermutation;
-
-	public void setFieldPermutation(int[] fieldPermutation) {
-		this.fieldPermutation = fieldPermutation;
-	}
-
-	public void reset(IFrameTupleAccessor fta, int tIndex) {
-		this.fta = fta;
-		this.tIndex = tIndex;
-	}
-
-	@Override
-	public IFrameTupleAccessor getFrameTupleAccessor() {
-		return fta;
-	}
-
-	@Override
-	public int getTupleIndex() {
-		return tIndex;
-	}
-
-	@Override
-	public int getFieldCount() {
-		return fieldPermutation.length;
-	}
-
-	@Override
-	public byte[] getFieldData(int fIdx) {
-		return fta.getBuffer().array();
-	}
-
-	@Override
-	public int getFieldStart(int fIdx) {
-		return fta.getTupleStartOffset(tIndex) + fta.getFieldSlotsLength()
-				+ fta.getFieldStartOffset(tIndex, fieldPermutation[fIdx]);
-	}
-
-	@Override
-	public int getFieldLength(int fIdx) {
-		return fta.getFieldLength(tIndex, fieldPermutation[fIdx]);
-	}
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public class PermutingFrameTupleReference extends FrameTupleReference {
+    private int[] fieldPermutation;
+
+    public PermutingFrameTupleReference(int[] fieldPermutation) {
+        this.fieldPermutation = fieldPermutation;
+    }
+
+    public PermutingFrameTupleReference() {
+    }
+
+    public void setFieldPermutation(int[] fieldPermutation) {
+        this.fieldPermutation = fieldPermutation;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return fieldPermutation.length;
+    }
+
+    @Override
+    public byte[] getFieldData(int fIdx) {
+        return fta.getBuffer().array();
+    }
+
+    @Override
+    public int getFieldStart(int fIdx) {
+        return fta.getTupleStartOffset(tIndex) + fta.getFieldSlotsLength()
+                + fta.getFieldStartOffset(tIndex, fieldPermutation[fIdx]);
+    }
+
+    @Override
+    public int getFieldLength(int fIdx) {
+        return fta.getFieldLength(tIndex, fieldPermutation[fIdx]);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index ae3aa52..13c6949 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -271,13 +271,13 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
         operationalComponents.clear();
         switch (ctx.getOperation()) {
             case UPDATE:
-            case UPSERT:
             case PHYSICALDELETE:
             case FLUSH:
             case DELETE:
                 operationalComponents.add(memoryComponents.get(cmc));
                 break;
             case INSERT:
+            case UPSERT:
                 addOperationalMutableComponents(operationalComponents);
                 operationalComponents.addAll(immutableComponents);
                 break;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
index 08891c2..fa25524 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
@@ -54,8 +54,8 @@ public final class LSMBTreeOpContext implements ILSMIndexOperationContext {
     public IndexOperation op;
     public final MultiComparator cmp;
     public final MultiComparator bloomFilterCmp;
-    public final IModificationOperationCallback modificationCallback;
-    public final ISearchOperationCallback searchCallback;
+    public IModificationOperationCallback modificationCallback;
+    public ISearchOperationCallback searchCallback;
     private final List<ILSMComponent> componentHolder;
     private final List<ILSMComponent> componentsToBeMerged;
     private final List<ILSMComponent> componentsToBeReplicated;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 63e651a..4c4ed28 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -28,8 +28,8 @@ import org.apache.hyracks.storage.am.common.api.IndexException;
 
 public interface ILSMHarness {
 
-    public void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException,
-            IndexException;
+    public void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple)
+            throws HyracksDataException, IndexException;
 
     public boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple)
             throws HyracksDataException, IndexException;
@@ -45,14 +45,14 @@ public interface ILSMHarness {
     public void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
             throws HyracksDataException, IndexException;
 
-    public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
-            IndexException;
+    public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation)
+            throws HyracksDataException, IndexException;
 
     public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
             throws HyracksDataException;
 
-    public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
-            IndexException;
+    public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation)
+            throws HyracksDataException, IndexException;
 
     public void addBulkLoadedComponent(ILSMComponent index) throws HyracksDataException, IndexException;
 
@@ -62,5 +62,4 @@ public interface ILSMHarness {
             LSMOperationType opType) throws HyracksDataException;
 
     public void endReplication(ILSMIndexOperationContext ctx) throws HyracksDataException;
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
index 08bcfee..99f981d 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
@@ -39,6 +39,6 @@ public interface ILSMIndexOperationContext extends IIndexOperationContext {
     public void setSearchPredicate(ISearchPredicate searchPredicate);
 
     public ISearchPredicate getSearchPredicate();
-    
+
     public List<ILSMComponent> getComponentsToBeReplicated();
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
index b172864..d99c9e8 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
@@ -22,17 +22,19 @@ package org.apache.hyracks.storage.am.lsm.common.dataflow;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.common.IStorageManagerInterface;
 import org.apache.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
@@ -49,18 +51,20 @@ public class LSMTreeIndexInsertUpdateDeleteOperatorDescriptor extends AbstractTr
             IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, int[] fieldPermutation,
             IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
-            ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackProvider) {
+            ITupleFilterFactory tupleFilterFactory, INullWriterFactory nullWriterFactory,
+            IModificationOperationCallbackFactory modificationOpCallbackProvider,
+            ISearchOperationCallbackFactory searchOpCallbackProvider) {
         super(spec, 1, 1, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, tupleFilterFactory, false,
-                false, null,
-                NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE, modificationOpCallbackProvider);
+                comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, tupleFilterFactory, false, false,
+                nullWriterFactory, NoOpLocalResourceFactoryProvider.INSTANCE, searchOpCallbackProvider,
+                modificationOpCallbackProvider);
         this.fieldPermutation = fieldPermutation;
         this.op = op;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         return new LSMIndexInsertUpdateDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
                 recordDescProvider, op);
     }



Mime
View raw message