asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [3/3] asterixdb git commit: Fix transaction logs and optimize upserts
Date Tue, 14 Mar 2017 18:13:08 GMT
Fix transaction logs and optimize upserts

Previously, Transaction logs didn't log previous image
which made it difficult to undo aborted transactions
correctly. This change fixes that by always recording
previous image.

In addition, Upsert was performed as a delete if found
followed by an insert with two logs. This change makes
it a single operation with a single transaction log.

Change-Id: Ice5296267033cd7debe76894c864c6411f761d83
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1554
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/04075533
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/04075533
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/04075533

Branch: refs/heads/master
Commit: 04075533ae042fa2741a190f37625c308d90a9d2
Parents: 89328a8
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Authored: Mon Mar 13 22:44:59 2017 -0700
Committer: abdullah alamoudi <bamousaa@gmail.com>
Committed: Tue Mar 14 10:37:29 2017 -0700

----------------------------------------------------------------------
 .../algebra/operators/CommitOperator.java       | 15 +----
 .../operators/physical/CommitPOperator.java     | 11 +---
 .../rules/SetupCommitExtensionOpRule.java       | 38 +----------
 .../apache/asterix/app/nc/RecoveryManager.java  | 29 +++++----
 .../app/bootstrap/TestNodeController.java       | 38 +++++------
 .../asterix/test/dataflow/LogMarkerTest.java    | 13 ++--
 .../asterix/test/logging/CheckpointingTest.java | 17 ++---
 .../transactions/AbstractOperationCallback.java |  4 +-
 .../asterix/common/transactions/DatasetId.java  | 19 ++----
 .../asterix/common/transactions/ILogRecord.java |  1 -
 .../transactions/ITransactionManager.java       | 14 ++--
 .../common/transactions/ImmutableDatasetId.java | 33 ++++++++++
 .../asterix/common/transactions/LogRecord.java  |  8 +--
 .../asterix/common/transactions/LogType.java    |  4 --
 asterixdb/asterix-installer/pom.xml             |  1 -
 .../src/main/assembly/binary-assembly.xml       |  1 -
 .../apache/asterix/metadata/MetadataNode.java   | 25 ++++----
 .../asterix/metadata/api/IMetadataIndex.java    |  4 +-
 .../metadata/bootstrap/MetadataBootstrap.java   |  6 +-
 .../metadata/bootstrap/MetadataIndex.java       |  7 +-
 .../metadata/declared/MetadataProvider.java     | 30 +++++----
 .../asterix/metadata/entities/Dataset.java      | 26 ++++----
 .../management/ReplicationChannel.java          |  1 -
 .../job/listener/JobEventListenerFactory.java   |  2 +-
 ...tiTransactionJobletEventListenerFactory.java |  2 +-
 .../LSMPrimaryUpsertOperatorNodePushable.java   | 39 +++++++-----
 .../LSMSecondaryUpsertOperatorNodePushable.java | 14 +++-
 .../std/FlushDatasetOperatorDescriptor.java     |  3 +-
 asterixdb/asterix-transactions/pom.xml          |  5 --
 ...tractIndexModificationOperationCallback.java | 66 ++++++++++++++-----
 .../LockThenSearchOperationCallback.java        |  4 +-
 .../LockThenSearchOperationCallbackFactory.java |  3 +-
 ...maryIndexInstantSearchOperationCallback.java |  5 +-
 ...exInstantSearchOperationCallbackFactory.java |  3 +-
 ...imaryIndexModificationOperationCallback.java | 21 ++----
 ...dexModificationOperationCallbackFactory.java | 16 ++---
 .../PrimaryIndexSearchOperationCallback.java    |  3 +-
 ...maryIndexSearchOperationCallbackFactory.java | 13 ++--
 ...ndaryIndexModificationOperationCallback.java | 17 ++---
 ...dexModificationOperationCallbackFactory.java | 20 +++---
 .../SecondaryIndexSearchOperationCallback.java  |  3 +-
 ...tasetIndexModificationOperationCallback.java | 10 ++-
 ...dexModificationOperationCallbackFactory.java | 17 ++---
 ...dexModificationOperationCallbackFactory.java | 17 ++---
 .../opcallbacks/UpsertOperationCallback.java    | 14 ++--
 .../UpsertOperationCallbackFactory.java         | 20 +++---
 .../runtime/CommitRuntimeFactory.java           | 10 +--
 .../management/runtime/UpsertCommitRuntime.java | 54 ----------------
 .../service/locking/ConcurrentLockManager.java  | 67 +++++++++++---------
 .../management/service/logging/LogBuffer.java   | 17 ++---
 .../logging/LogManagerWithReplication.java      |  1 -
 .../management/service/recovery/TxnId.java      |  4 +-
 .../management/service/locking/Request.java     | 14 ++++
 asterixdb/asterix-yarn/pom.xml                  |  2 -
 .../AbstractOneInputOneOutputPushRuntime.java   |  1 -
 .../api/IModificationOperationCallback.java     | 16 +----
 .../am/common/impls/NoOpOperationCallback.java  |  5 --
 .../storage/am/lsm/btree/impls/LSMBTree.java    |  2 +-
 .../am/lsm/common/api/ILSMIndexAccessor.java    |  9 +++
 .../lsm/common/impls/LSMTreeIndexAccessor.java  |  6 ++
 .../impls/LSMInvertedIndexAccessor.java         |  5 ++
 ...stractModificationOperationCallbackTest.java |  5 --
 .../am/common/TestOperationCallback.java        |  6 --
 63 files changed, 419 insertions(+), 467 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
index 1ee130c..6578c9c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
@@ -32,24 +32,20 @@ import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionRef
 public class CommitOperator extends AbstractDelegatedLogicalOperator {
 
     private List<LogicalVariable> primaryKeyLogicalVars;
-    private LogicalVariable upsertVar;
     private boolean isSink;
 
     public CommitOperator(boolean isSink) {
         this.isSink = isSink;
-        this.upsertVar = null;
         primaryKeyLogicalVars = new ArrayList<>();
     }
 
-    public CommitOperator(List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar, boolean isSink) {
+    public CommitOperator(List<LogicalVariable> primaryKeyLogicalVars, boolean isSink) {
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
-        this.upsertVar = upsertVar;
         this.isSink = isSink;
     }
 
     @Override
     public boolean isMap() {
-        // TODO Auto-generated method stub
         return false;
     }
 
@@ -62,20 +58,18 @@ public class CommitOperator extends AbstractDelegatedLogicalOperator {
     }
 
     //Provided for Extensions but not used by core
-    public void setVars(List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar) {
+    public void setVars(List<LogicalVariable> primaryKeyLogicalVars) {
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
-        this.upsertVar = upsertVar;
     }
 
     @Override
     public IOperatorDelegate newInstance() {
-        return new CommitOperator(primaryKeyLogicalVars, upsertVar, isSink);
+        return new CommitOperator(primaryKeyLogicalVars, isSink);
     }
 
     @Override
     public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
             throws AlgebricksException {
-        // TODO Auto-generated method stub
         return false;
     }
 
@@ -87,9 +81,6 @@ public class CommitOperator extends AbstractDelegatedLogicalOperator {
     @Override
     public void getUsedVariables(Collection<LogicalVariable> usedVars) {
         usedVars.addAll(primaryKeyLogicalVars);
-        if (upsertVar != null) {
-            usedVars.add(upsertVar);
-        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
index d0cee55..20c69c4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -46,15 +46,12 @@ public class CommitPOperator extends AbstractPhysicalOperator {
     private final List<LogicalVariable> primaryKeyLogicalVars;
     private final JobId jobId;
     private final Dataset dataset;
-    private final LogicalVariable upsertVar;
     private final boolean isSink;
 
-    public CommitPOperator(JobId jobId, Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars,
-            LogicalVariable upsertVar, boolean isSink) {
+    public CommitPOperator(JobId jobId, Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars, boolean isSink) {
         this.jobId = jobId;
         this.dataset = dataset;
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
-        this.upsertVar = upsertVar;
         this.isSink = isSink;
     }
 
@@ -98,12 +95,8 @@ public class CommitPOperator extends AbstractPhysicalOperator {
         for (int i = 0; i < splitsForDataset.length; i++) {
             datasetPartitions[i] = i;
         }
-        int upsertVarIdx = -1;
-        if (upsertVar != null) {
-            upsertVarIdx = inputSchemas[0].findVariable(upsertVar);
-        }
         IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(jobId, primaryKeyFields, metadataProvider,
-                upsertVarIdx, datasetPartitions, isSink);
+                datasetPartitions, isSink);
         builder.contributeMicroOperator(op, runtime, recDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
index 9b442ae..18e5f5e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
@@ -24,28 +24,21 @@ import java.util.List;
 import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.algebra.operators.physical.CommitPOperator;
 import org.apache.asterix.common.transactions.JobId;
-import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule {
@@ -73,7 +66,6 @@ public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule {
         List<Mutable<ILogicalExpression>> primaryKeyExprs = null;
         Dataset dataset = null;
         AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
-        LogicalVariable upsertVar = null;
         while (descendantOp != null) {
             if (descendantOp.getOperatorTag() == LogicalOperatorTag.INDEX_INSERT_DELETE_UPSERT) {
                 IndexInsertDeleteUpsertOperator operator = (IndexInsertDeleteUpsertOperator) descendantOp;
@@ -87,32 +79,6 @@ public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule {
                 if (!insertDeleteUpsertOperator.isBulkload()) {
                     primaryKeyExprs = insertDeleteUpsertOperator.getPrimaryKeyExpressions();
                     dataset = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset();
-                    if (insertDeleteUpsertOperator.getOperation() == Kind.UPSERT) {
-                        //we need to add a function that checks if previous record was found
-                        upsertVar = context.newVar();
-                        AbstractFunctionCallExpression orFunc =
-                                new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.OR));
-                        // is new value missing? -> this means that the expected operation is delete
-                        AbstractFunctionCallExpression isNewMissingFunc = new ScalarFunctionCallExpression(
-                                FunctionUtil.getFunctionInfo(BuiltinFunctions.IS_MISSING));
-                        isNewMissingFunc.getArguments().add(insertDeleteUpsertOperator.getPayloadExpression());
-                        AbstractFunctionCallExpression isPrevMissingFunc = new ScalarFunctionCallExpression(
-                                FunctionUtil.getFunctionInfo(BuiltinFunctions.IS_MISSING));
-                        // argument is the previous record
-                        isPrevMissingFunc.getArguments().add(new MutableObject<ILogicalExpression>(
-                                new VariableReferenceExpression(insertDeleteUpsertOperator.getBeforeOpRecordVar())));
-                        orFunc.getArguments().add(new MutableObject<ILogicalExpression>(isPrevMissingFunc));
-                        orFunc.getArguments().add(new MutableObject<ILogicalExpression>(isNewMissingFunc));
-
-                        // AssignOperator puts in the cast var the casted record
-                        AssignOperator upsertFlagAssign =
-                                new AssignOperator(upsertVar, new MutableObject<ILogicalExpression>(orFunc));
-                        // Connect the current top of the plan to the cast operator
-                        upsertFlagAssign.getInputs().add(new MutableObject<>(eOp.getInputs().get(0).getValue()));
-                        eOp.getInputs().clear();
-                        eOp.getInputs().add(new MutableObject<ILogicalOperator>(upsertFlagAssign));
-                        context.computeAndSetTypeEnvironmentForOperator(upsertFlagAssign);
-                    }
                     break;
                 }
             }
@@ -138,9 +104,9 @@ public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule {
         JobId jobId = mp.getJobId();
 
         //create the logical and physical operator
-        CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, upsertVar, isSink);
+        CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, isSink);
         CommitPOperator commitPOperator =
-                new CommitPOperator(jobId, dataset, primaryKeyLogicalVars, upsertVar, isSink);
+                new CommitPOperator(jobId, dataset, primaryKeyLogicalVars, isSink);
         commitOperator.setPhysicalOperator(commitPOperator);
 
         //create ExtensionOperator and put the commitOperator in it.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 6e4e4cb..07f341d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -57,6 +57,7 @@ import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.transactions.Resource;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
 import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager;
@@ -68,7 +69,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -239,7 +239,6 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                     jobCommitLogCount++;
                     break;
                 case LogType.ENTITY_COMMIT:
-                case LogType.UPSERT_ENTITY_COMMIT:
                     if (partitions.contains(logRecord.getResourcePartition())) {
                         analyzeEntityCommitLog(logRecord);
                         entityCommitLogCount++;
@@ -405,7 +404,6 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                     case LogType.ENTITY_COMMIT:
                     case LogType.ABORT:
                     case LogType.FLUSH:
-                    case LogType.UPSERT_ENTITY_COMMIT:
                     case LogType.WAIT:
                     case LogType.MARKER:
                         //do nothing
@@ -598,13 +596,12 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                         }
                         break;
                     case LogType.ENTITY_COMMIT:
-                    case LogType.UPSERT_ENTITY_COMMIT:
                         if (activePartitions.contains(logRecord.getResourcePartition())) {
                             jobLoserEntity2LSNsMap.remove(tempKeyTxnId);
                             entityCommitLogCount++;
                             if (IS_DEBUG_MODE) {
-                                LOGGER.info(Thread.currentThread().getId() + "======> entity_commit[" + currentLSN
-                                        + "]" + tempKeyTxnId);
+                                LOGGER.info(Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
+                                        + tempKeyTxnId);
                             }
                         }
                         break;
@@ -686,10 +683,17 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                     (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
             ILSMIndexAccessor indexAccessor =
                     index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-            if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
+            if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.INSERT_BYTE) {
                 indexAccessor.forceDelete(logRecord.getNewValue());
-            } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) {
-                indexAccessor.forceInsert(logRecord.getNewValue());
+            } else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.DELETE_BYTE) {
+                indexAccessor.forceInsert(logRecord.getOldValue());
+            } else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.UPSERT_BYTE) {
+                // undo, upsert the old value if found, otherwise, physical delete
+                if (logRecord.getOldValue() == null) {
+                    indexAccessor.forcePhysicalDelete(logRecord.getNewValue());
+                } else {
+                    indexAccessor.forceUpsert(logRecord.getOldValue());
+                }
             } else {
                 throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp());
             }
@@ -705,10 +709,13 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
             ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(datasetId, resourceId);
             ILSMIndexAccessor indexAccessor =
                     index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-            if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
+            if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.INSERT_BYTE) {
                 indexAccessor.forceInsert(logRecord.getNewValue());
-            } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) {
+            } else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.DELETE_BYTE) {
                 indexAccessor.forceDelete(logRecord.getNewValue());
+            } else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.UPSERT_BYTE) {
+                // redo, upsert the new value
+                indexAccessor.forceUpsert(logRecord.getNewValue());
             } else {
                 throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp());
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index b7026ef..53f5f62 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -53,11 +53,13 @@ import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
 import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
 import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
 import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadataFactory;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
 import org.apache.asterix.transaction.management.runtime.CommitRuntime;
 import org.apache.asterix.transaction.management.service.logging.LogReader;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
@@ -159,18 +161,20 @@ public class TestNodeController {
         return new org.apache.asterix.common.transactions.JobId((int) jobId.getId());
     }
 
-    public LSMInsertDeleteOperatorNodePushable getInsertPipeline(IHyracksTaskContext ctx, Dataset dataset,
-            IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
-            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, int[] filterFields,
-            int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
-            StorageComponentProvider storageComponentProvider) throws AlgebricksException, HyracksDataException {
+    public Pair<LSMInsertDeleteOperatorNodePushable, CommitRuntime> getInsertPipeline(IHyracksTaskContext ctx,
+            Dataset dataset, IAType[] primaryKeyTypes,
+            ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, int[] filterFields, int[] primaryKeyIndexes,
+            List<Integer> primaryKeyIndicators, StorageComponentProvider storageComponentProvider)
+            throws AlgebricksException, HyracksDataException {
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
                 mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
                 storageComponentProvider);
         IndexOperation op = IndexOperation.INSERT;
         IModificationOperationCallbackFactory modOpCallbackFactory =
                 new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(), dataset.getDatasetId(),
-                        primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, op, ResourceType.LSM_BTREE, true);
+                        primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, Operation.get(op),
+                        ResourceType.LSM_BTREE);
         LSMTreeInsertDeleteOperatorDescriptor indexOpDesc =
                 getInsertOpratorDesc(primaryIndexInfo, modOpCallbackFactory);
         IIndexDataflowHelperFactory dataflowHelperFactory =
@@ -183,7 +187,7 @@ public class TestNodeController {
                 primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION, true);
         insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
         commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
-        return insertOp;
+        return Pair.of(insertOp, commitOp);
     }
 
     public IPushRuntime getFullScanPipeline(IFrameWriter countOp, IHyracksTaskContext ctx, Dataset dataset,
@@ -298,8 +302,7 @@ public class TestNodeController {
         Index index = primaryIndexInfo.getIndex();
         MetadataProvider mdProvider = new MetadataProvider(dataverse, storageComponentProvider);
         return dataset.getIndexDataflowHelperFactory(mdProvider, index, primaryIndexInfo.recordType,
-                primaryIndexInfo.metaType, primaryIndexInfo.mergePolicyFactory,
-                primaryIndexInfo.mergePolicyProperties);
+                primaryIndexInfo.metaType, primaryIndexInfo.mergePolicyFactory, primaryIndexInfo.mergePolicyProperties);
     }
 
     public IIndexDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset, IAType[] primaryKeyTypes,
@@ -431,11 +434,10 @@ public class TestNodeController {
         private Index index;
         private IStorageComponentProvider storageComponentProvider;
 
-        public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
-                ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory,
-                Map<String, String> mergePolicyProperties, int[] filterFields, int[] primaryKeyIndexes,
-                List<Integer> primaryKeyIndicators, IStorageComponentProvider storageComponentProvider)
-                throws AlgebricksException {
+        public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
+                ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+                int[] filterFields, int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
+                IStorageComponentProvider storageComponentProvider) throws AlgebricksException {
             this.storageComponentProvider = storageComponentProvider;
             this.dataset = dataset;
             this.primaryKeyTypes = primaryKeyTypes;
@@ -474,10 +476,10 @@ public class TestNodeController {
             index = new Index(dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName(),
                     IndexType.BTREE, keyFieldNames, keyFieldSourceIndicators, keyFieldTypes, false, true,
                     MetadataUtil.PENDING_NO_OP);
-            localResourceFactoryProvider = getPrimaryIndexLocalResourceMetadataProvider(storageComponentProvider,
-                    index, dataset, primaryIndexTypeTraits, primaryIndexComparatorFactories,
-                    primaryIndexBloomFilterKeyFields, mergePolicyFactory, mergePolicyProperties, filterTypeTraits,
-                    filterCmpFactories, btreeFields, filterFields, dataset.getIndexOperationTrackerFactory(index));
+            localResourceFactoryProvider = getPrimaryIndexLocalResourceMetadataProvider(storageComponentProvider, index,
+                    dataset, primaryIndexTypeTraits, primaryIndexComparatorFactories, primaryIndexBloomFilterKeyFields,
+                    mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields,
+                    filterFields, dataset.getIndexOperationTrackerFactory(index));
         }
 
         public Index getIndex() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index 1467dbf..1bcc88a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -115,9 +115,10 @@ public class LogMarkerTest {
                 IHyracksTaskContext ctx = nc.createTestContext(true);
                 nc.newJobId();
                 ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
-                LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
-                        RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null, KEY_INDEXES,
-                        KEY_INDICATORS_LIST, storageManager);
+                LSMInsertDeleteOperatorNodePushable insertOp = nc
+                        .getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
+                                null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager)
+                        .getLeft();
                 insertOp.open();
                 TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
                         RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
@@ -142,9 +143,9 @@ public class LogMarkerTest {
                     tupleAppender.write(insertOp, true);
                 }
                 insertOp.close();
-                nc.getTransactionManager().completedTransaction(txnCtx, new DatasetId(-1), -1, true);
-                IIndexDataflowHelper dataflowHelper = nc.getPrimaryIndexDataflowHelper(dataset, KEY_TYPES,
-                        RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES,
+                nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+                IIndexDataflowHelper dataflowHelper = nc.getPrimaryIndexDataflowHelper(dataset, KEY_TYPES, RECORD_TYPE,
+                        META_TYPE, new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES,
                         KEY_INDICATORS_LIST);
                 dataflowHelper.open();
                 LSMBTree btree = (LSMBTree) dataflowHelper.getIndexInstance();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 34bb9cf..35c3c42 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -62,14 +62,14 @@ public class CheckpointingTest {
 
     private static final String DEFAULT_TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
     private static final String TEST_CONFIG_FILE_NAME = "asterix-test-configuration.xml";
-    private static final String TEST_CONFIG_PATH = System.getProperty("user.dir") + File.separator + "target"
-            + File.separator + "config";
+    private static final String TEST_CONFIG_PATH =
+            System.getProperty("user.dir") + File.separator + "target" + File.separator + "config";
     private static final String TEST_CONFIG_FILE_PATH = TEST_CONFIG_PATH + File.separator + TEST_CONFIG_FILE_NAME;
     private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
     private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
             new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
-    private static final GenerationFunction[] RECORD_GEN_FUNCTION = { GenerationFunction.DETERMINISTIC,
-            GenerationFunction.DETERMINISTIC };
+    private static final GenerationFunction[] RECORD_GEN_FUNCTION =
+            { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
     private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
     private static final ARecordType META_TYPE = null;
     private static final GenerationFunction[] META_GEN_FUNCTION = null;
@@ -123,9 +123,10 @@ public class CheckpointingTest {
                 nc.newJobId();
                 ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
                 // Prepare insert operation
-                LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
-                        RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATOR_LIST,
-                        storageManager);
+                LSMInsertDeleteOperatorNodePushable insertOp = nc
+                        .getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
+                                null, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager)
+                        .getLeft();
                 insertOp.open();
                 TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
                         RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
@@ -195,7 +196,7 @@ public class CheckpointingTest {
                     tupleAppender.write(insertOp, true);
                 }
                 insertOp.close();
-                nc.getTransactionManager().completedTransaction(txnCtx, new DatasetId(-1), -1, true);
+                nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
             } finally {
                 nc.deInit();
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallback.java
index ba7a780..9844344 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallback.java
@@ -32,9 +32,9 @@ public abstract class AbstractOperationCallback {
     protected final ILockManager lockManager;
     protected final long[] longHashes;
 
-    public AbstractOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
+    public AbstractOperationCallback(DatasetId datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
             ILockManager lockManager) {
-        this.datasetId = new DatasetId(datasetId);
+        this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
         this.txnCtx = txnCtx;
         this.lockManager = lockManager;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java
index 81ece13..da6988c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java
@@ -21,27 +21,22 @@ package org.apache.asterix.common.transactions;
 import java.io.Serializable;
 
 public class DatasetId implements Serializable {
-    /**
-     *
-     */
+
     private static final long serialVersionUID = 1L;
-    /**
-     * The number of bytes used to represent {@link DatasetId} value.
-     */
     public static final int BYTES = Integer.BYTES;
-
-    int id;
+    public static final DatasetId NULL = new ImmutableDatasetId(-1);
+    private int id;
 
     public DatasetId(int id) {
         this.id = id;
     }
 
-    public void setId(int id) {
-        this.id = id;
+    public final int getId() {
+        return id;
     }
 
-    public int getId() {
-        return id;
+    public void setId(int id) {
+        this.id = id;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 29af931..6ee0980 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -47,7 +47,6 @@ public interface ILogRecord {
     public static final int SEQ_NUM_LEN = Long.BYTES;
     public static final int TYPE_LEN = Byte.BYTES;
     public static final int UUID_LEN = Long.BYTES;
-    public static final int VBUCKET_ID_LEN = Short.BYTES;
 
     public static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
     public static final int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
index ebad94d..0123814 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
@@ -70,13 +70,13 @@ public interface ITransactionManager {
      *            the transaction context associated with the transaction
      * @param datasetId
      *            TODO
-     * @param PKHashVal
+     * @param pkHash
      *            TODO
      * @throws ACIDException
      * @see ITransactionContextimport org.apache.hyracks.api.job.JobId;
      * @see ACIDException
      */
-    public void commitTransaction(ITransactionContext txnContext, DatasetId datasetId, int PKHashVal)
+    public void commitTransaction(ITransactionContext txnContext, DatasetId datasetId, int pkHash)
             throws ACIDException;
 
     /**
@@ -86,13 +86,13 @@ public interface ITransactionManager {
      *            the transaction context associated with the transaction
      * @param datasetId
      *            TODO
-     * @param PKHashVal
+     * @param pkHash
      *            TODO
      * @throws ACIDException
      * @see ITransactionContext
      * @see ACIDException
      */
-    public void abortTransaction(ITransactionContext txnContext, DatasetId datasetId, int PKHashVal)
+    public void abortTransaction(ITransactionContext txnContext, DatasetId datasetId, int pkHash)
             throws ACIDException;
 
     /**
@@ -104,15 +104,15 @@ public interface ITransactionManager {
      *            the transaction context associated with the transaction
      * @param datasetId
      *            TODO
-     * @param PKHashVal
+     * @param pkHash
      *            TODO
      * @param success
      *            indicates the success or failure. The transaction is committed
      *            or aborted accordingly.
      * @throws ACIDException
      */
-    public void completedTransaction(ITransactionContext txnContext, DatasetId datasetId, int PKHashVal, boolean success)
-            throws ACIDException;
+    public void completedTransaction(ITransactionContext txnContext, DatasetId datasetId, int pkHash,
+            boolean success) throws ACIDException;
 
     /**
      * Returns the Transaction Provider for the transaction eco-system. A

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ImmutableDatasetId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ImmutableDatasetId.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ImmutableDatasetId.java
new file mode 100644
index 0000000..c47ca10
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ImmutableDatasetId.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+public class ImmutableDatasetId extends DatasetId {
+    private static final long serialVersionUID = 1L;
+
+    public ImmutableDatasetId(int datasetId) {
+        super(datasetId);
+    }
+
+    @Override
+    public void setId(int datasetId) {
+        throw new UnsupportedOperationException();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index d691b18..e80cfa6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -133,7 +133,6 @@ public class LogRecord implements ILogRecord {
         buffer.putInt(jobId);
         switch (logType) {
             case LogType.ENTITY_COMMIT:
-            case LogType.UPSERT_ENTITY_COMMIT:
                 writeEntityInfo(buffer);
                 break;
             case LogType.UPDATE:
@@ -252,7 +251,7 @@ public class LogRecord implements ILogRecord {
         jobId = buffer.getInt();
         switch (logType) {
             case LogType.FLUSH:
-                if (buffer.remaining() < DatasetId.BYTES) {
+                if (buffer.remaining() < ILogRecord.DS_LEN) {
                     return RecordReadStatus.TRUNCATED;
                 }
                 datasetId = buffer.getInt();
@@ -268,7 +267,6 @@ public class LogRecord implements ILogRecord {
                 computeAndSetLogSize();
                 break;
             case LogType.ENTITY_COMMIT:
-            case LogType.UPSERT_ENTITY_COMMIT:
                 if (readEntityInfo(buffer)) {
                     computeAndSetLogSize();
                 } else {
@@ -308,6 +306,7 @@ public class LogRecord implements ILogRecord {
                         oldValue = readTuple(buffer, readOldValue, oldValueFieldCount, oldValueSize);
                     } else {
                         oldValueSize = 0;
+                        oldValue = null;
                     }
                 } else {
                     return RecordReadStatus.TRUNCATED;
@@ -428,7 +427,6 @@ public class LogRecord implements ILogRecord {
                 logSize = JOB_TERMINATE_LOG_SIZE;
                 break;
             case LogType.ENTITY_COMMIT:
-            case LogType.UPSERT_ENTITY_COMMIT:
                 logSize = ENTITY_COMMIT_LOG_BASE_SIZE + PKValueSize;
                 break;
             case LogType.FLUSH:
@@ -457,7 +455,7 @@ public class LogRecord implements ILogRecord {
         builder.append(" LogType : ").append(LogType.toString(logType));
         builder.append(" LogSize : ").append(logSize);
         builder.append(" JobId : ").append(jobId);
-        if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPSERT_ENTITY_COMMIT || logType == LogType.UPDATE) {
+        if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPDATE) {
             builder.append(" DatasetId : ").append(datasetId);
             builder.append(" ResourcePartition : ").append(resourcePartition);
             builder.append(" PKHashValue : ").append(PKHashValue);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
index 269e4b9..11c45ad 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
@@ -25,7 +25,6 @@ public class LogType {
     public static final byte ENTITY_COMMIT = 2;
     public static final byte ABORT = 3;
     public static final byte FLUSH = 4;
-    public static final byte UPSERT_ENTITY_COMMIT = 5;
     public static final byte WAIT = 6;
     public static final byte MARKER = 7;
 
@@ -34,7 +33,6 @@ public class LogType {
     private static final String STRING_ENTITY_COMMIT = "ENTITY_COMMIT";
     private static final String STRING_ABORT = "ABORT";
     private static final String STRING_FLUSH = "FLUSH";
-    private static final String STRING_UPSERT_ENTITY_COMMIT = "UPSERT_ENTITY_COMMIT";
     private static final String STRING_WAIT = "WAIT";
     private static final String STRING_MARKER = "MARKER";
     private static final String STRING_UNKNOWN_LOG_TYPE = "UNKNOWN_LOG_TYPE";
@@ -51,8 +49,6 @@ public class LogType {
                 return STRING_ABORT;
             case LogType.FLUSH:
                 return STRING_FLUSH;
-            case LogType.UPSERT_ENTITY_COMMIT:
-                return STRING_UPSERT_ENTITY_COMMIT;
             case LogType.WAIT:
                 return STRING_WAIT;
             case LogType.MARKER:

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-installer/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/pom.xml b/asterixdb/asterix-installer/pom.xml
index 8b200bb..a6e7541 100644
--- a/asterixdb/asterix-installer/pom.xml
+++ b/asterixdb/asterix-installer/pom.xml
@@ -106,7 +106,6 @@
             <dependencySet>
               <includes>
                 <!-- NOTE! Any changes here must be mirrored in src/main/assembly/binary-assembly.xml -->
-                <include>org.apache.hadoop:hadoop-core</include>
                 <include>commons-cli:commons-cli</include>
                 <include>commons-logging:commons-logging</include>
               </includes>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-installer/src/main/assembly/binary-assembly.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/main/assembly/binary-assembly.xml b/asterixdb/asterix-installer/src/main/assembly/binary-assembly.xml
index 88f2b70..8d3844a 100644
--- a/asterixdb/asterix-installer/src/main/assembly/binary-assembly.xml
+++ b/asterixdb/asterix-installer/src/main/assembly/binary-assembly.xml
@@ -104,7 +104,6 @@
     <dependencySet>
       <includes>
         <!-- NOTE! Any changes here must be mirrored in asterix-installer pom.xml for licensegen -->
-        <include>org.apache.hadoop:hadoop-core</include>
         <include>commons-cli:commons-cli</include>
         <include>commons-logging:commons-logging</include>
       </includes>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 51790e6..7a04ce8 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -41,6 +41,7 @@ import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.ImmutableDatasetId;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -94,6 +95,7 @@ import org.apache.asterix.om.types.AUnionType;
 import org.apache.asterix.om.types.AbstractComplexType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallback;
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
@@ -114,7 +116,6 @@ import org.apache.hyracks.storage.am.common.api.IndexException;
 import org.apache.hyracks.storage.am.common.api.TreeIndexException;
 import org.apache.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
@@ -123,8 +124,8 @@ import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 public class MetadataNode implements IMetadataNode {
     private static final long serialVersionUID = 1L;
 
-    private static final DatasetId METADATA_DATASET_ID = new DatasetId(
-            MetadataPrimaryIndexes.PROPERTIES_METADATA.getDatasetId());
+    private static final DatasetId METADATA_DATASET_ID =
+            new ImmutableDatasetId(MetadataPrimaryIndexes.PROPERTIES_METADATA.getDatasetId());
 
     // shared between core and extension
     private IDatasetLifecycleManager datasetLifecycleManager;
@@ -167,7 +168,7 @@ public class MetadataNode implements IMetadataNode {
     @Override
     public void commitTransaction(JobId jobId) throws RemoteException, ACIDException {
         ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
-        transactionSubsystem.getTransactionManager().commitTransaction(txnCtx, new DatasetId(-1), -1);
+        transactionSubsystem.getTransactionManager().commitTransaction(txnCtx, DatasetId.NULL, -1);
     }
 
     @Override
@@ -175,7 +176,7 @@ public class MetadataNode implements IMetadataNode {
         try {
             ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId,
                     false);
-            transactionSubsystem.getTransactionManager().abortTransaction(txnCtx, new DatasetId(-1), -1);
+            transactionSubsystem.getTransactionManager().abortTransaction(txnCtx, DatasetId.NULL, -1);
         } catch (ACIDException e) {
             e.printStackTrace();
             throw e;
@@ -407,7 +408,7 @@ public class MetadataNode implements IMetadataNode {
 
     private void insertTupleIntoIndex(JobId jobId, IMetadataIndex metadataIndex, ITupleReference tuple)
             throws ACIDException, HyracksDataException, IndexException {
-        long resourceID = metadataIndex.getResourceID();
+        long resourceID = metadataIndex.getResourceId();
         String resourceName = metadataIndex.getFile().getRelativePath();
         ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.get(resourceName);
         try {
@@ -415,7 +416,7 @@ public class MetadataNode implements IMetadataNode {
 
             // prepare a Callback for logging
             IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID,
-                    metadataIndex, lsmIndex, IndexOperation.INSERT);
+                    metadataIndex, lsmIndex, Operation.INSERT);
 
             ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
 
@@ -435,16 +436,16 @@ public class MetadataNode implements IMetadataNode {
     }
 
     private IModificationOperationCallback createIndexModificationCallback(JobId jobId, long resourceId,
-            IMetadataIndex metadataIndex, ILSMIndex lsmIndex, IndexOperation indexOp) throws ACIDException {
+            IMetadataIndex metadataIndex, ILSMIndex lsmIndex, Operation indexOp) throws ACIDException {
         ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
 
         // Regardless of the index type (primary or secondary index), secondary index modification callback is given
         // This is still correct since metadata index operation doesn't require any lock from ConcurrentLockMgr and
         // The difference between primaryIndexModCallback and secondaryIndexModCallback is that primary index requires
         // locks and secondary index doesn't.
-        return new SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
+        return new SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId(),
                 metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
-                transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp, false);
+                transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
     }
 
     @Override
@@ -679,14 +680,14 @@ public class MetadataNode implements IMetadataNode {
 
     private void deleteTupleFromIndex(JobId jobId, IMetadataIndex metadataIndex, ITupleReference tuple)
             throws ACIDException, HyracksDataException, IndexException {
-        long resourceID = metadataIndex.getResourceID();
+        long resourceID = metadataIndex.getResourceId();
         String resourceName = metadataIndex.getFile().getRelativePath();
         ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.get(resourceName);
         try {
             datasetLifecycleManager.open(resourceName);
             // prepare a Callback for logging
             IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID,
-                    metadataIndex, lsmIndex, IndexOperation.DELETE);
+                    metadataIndex, lsmIndex, Operation.DELETE);
             ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
 
             ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataIndex.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataIndex.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataIndex.java
index 9eec64c..6c220af 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataIndex.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataIndex.java
@@ -75,9 +75,9 @@ public interface IMetadataIndex extends Serializable {
 
     public int getFileId();
 
-    public void setResourceID(long resourceID);
+    public void setResourceId(long resourceId);
 
-    public long getResourceID();
+    public long getResourceId();
 
     public DatasetId getDatasetId();
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 0410911..15a2783 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -364,7 +364,7 @@ public class MetadataBootstrap {
                     ioOpCallbackFactory.createIoOpCallback(), index.isPrimaryIndex(), null, null, null, null, true,
                     appContext.getStorageComponentProvider().getMetadataPageManagerFactory());
             lsmBtree.create();
-            resourceID = index.getResourceID();
+            resourceID = index.getResourceId();
             Resource localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits, comparatorFactories,
                     bloomFilterKeyFields, index.isPrimaryIndex(), index.getDatasetId().getId(),
                     metadataPartition.getPartitionId(), appContext.getMetadataMergePolicyFactory(),
@@ -385,7 +385,7 @@ public class MetadataBootstrap {
                         + " to intialize as a new instance. (WARNING: all data will be lost.)");
             }
             resourceID = resource.getId();
-            if (index.getResourceID() != resource.getId()) {
+            if (index.getResourceId() != resource.getId()) {
                 throw new HyracksDataException("Resource Id doesn't match expected metadata index resource id");
             }
             lsmBtree = (LSMBTree) dataLifecycleManager.get(file.getRelativePath());
@@ -402,7 +402,7 @@ public class MetadataBootstrap {
                 dataLifecycleManager.register(file.getRelativePath(), lsmBtree);
             }
         }
-        index.setResourceID(resourceID);
+        index.setResourceId(resourceID);
         index.setFile(file);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
index 80a670e..ee5c9f3 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
@@ -26,6 +26,7 @@ import java.util.List;
 
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ImmutableDatasetId;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -147,7 +148,7 @@ public class MetadataIndex implements IMetadataIndex {
             }
         }
 
-        this.datasetId = new DatasetId(indexImmutableProperties.getDatasetId());
+        this.datasetId = new ImmutableDatasetId(indexImmutableProperties.getDatasetId());
         this.isPrimaryIndex = isPrimaryIndex;
 
         //PrimaryKeyFieldIndexes
@@ -260,12 +261,12 @@ public class MetadataIndex implements IMetadataIndex {
     }
 
     @Override
-    public void setResourceID(long resourceID) {
+    public void setResourceId(long resourceID) {
         this.resourceId = resourceID;
     }
 
     @Override
-    public long getResourceID() {
+    public long getResourceId() {
         return resourceId;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 6901e1d..39ea54d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -89,6 +89,7 @@ import org.apache.asterix.runtime.operators.LSMTreeUpsertOperatorDescriptor;
 import org.apache.asterix.runtime.utils.AppContextInfo;
 import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
@@ -1092,9 +1093,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
 
             IModificationOperationCallbackFactory modificationCallbackFactory = temp
                     ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            primaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT, ResourceType.LSM_BTREE)
+                            primaryKeyFields, txnSubsystemProvider, Operation.UPSERT, ResourceType.LSM_BTREE)
                     : new UpsertOperationCallbackFactory(jobId, datasetId, primaryKeyFields, txnSubsystemProvider,
-                            IndexOperation.UPSERT, ResourceType.LSM_BTREE, dataset.hasMetaPart());
+                            Operation.UPSERT, ResourceType.LSM_BTREE);
 
             LockThenSearchOperationCallbackFactory searchCallbackFactory = new LockThenSearchOperationCallbackFactory(
                     jobId, datasetId, primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
@@ -1288,11 +1289,10 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             IModificationOperationCallbackFactory modificationCallbackFactory = temp
                     ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(
                             ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(), datasetId,
-                            primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
+                            primaryKeyFields, txnSubsystemProvider, Operation.get(indexOp), ResourceType.LSM_BTREE)
                     : new PrimaryIndexModificationOperationCallbackFactory(
                             ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(), datasetId,
-                            primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE,
-                            dataset.hasMetaPart());
+                            primaryKeyFields, txnSubsystemProvider, Operation.get(indexOp), ResourceType.LSM_BTREE);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1486,10 +1486,11 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             int datasetId = dataset.getDatasetId();
             IModificationOperationCallbackFactory modificationCallbackFactory = temp
                     ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
+                            ResourceType.LSM_BTREE)
                     : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE,
-                            dataset.hasMetaPart());
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
+                            ResourceType.LSM_BTREE);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1643,10 +1644,11 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             int datasetId = dataset.getDatasetId();
             IModificationOperationCallbackFactory modificationCallbackFactory = temp
                     ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE)
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
+                            ResourceType.LSM_RTREE)
                     : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE,
-                            dataset.hasMetaPart());
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
+                            ResourceType.LSM_RTREE);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1852,11 +1854,11 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             int datasetId = dataset.getDatasetId();
             IModificationOperationCallbackFactory modificationCallbackFactory = temp
                     ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
                             ResourceType.LSM_INVERTED_INDEX)
                     : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_INVERTED_INDEX, dataset.hasMetaPart());
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp),
+                            ResourceType.LSM_INVERTED_INDEX);
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
             IIndexDataflowHelperFactory indexDataFlowFactory = dataset.getIndexDataflowHelperFactory(this,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 34faf63..ad75b6b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -60,6 +60,7 @@ import org.apache.asterix.metadata.utils.MetadataUtil;
 import org.apache.asterix.metadata.utils.RTreeDataflowHelperFactoryProvider;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
@@ -517,23 +518,26 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
             IStorageComponentProvider componentProvider, Index index, JobId jobId, IndexOperation op,
             int[] primaryKeyFields) throws AlgebricksException {
         if (getDatasetDetails().isTemp()) {
-            return new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(),
-                    primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), op, index.resourceType());
+            return op == IndexOperation.DELETE || op == IndexOperation.INSERT || op == IndexOperation.UPSERT
+                    ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(),
+                            primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
+                            index.resourceType())
+                    : NoOpOperationCallbackFactory.INSTANCE;
         } else if (index.isPrimaryIndex()) {
             return op == IndexOperation.UPSERT
                     ? new UpsertOperationCallbackFactory(jobId, getDatasetId(), primaryKeyFields,
-                            componentProvider.getTransactionSubsystemProvider(), op, index.resourceType(),
-                            hasMetaPart())
+                            componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
+                            index.resourceType())
                     : op == IndexOperation.DELETE || op == IndexOperation.INSERT
                             ? new PrimaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(),
-                                    primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), op,
-                                    index.resourceType(), hasMetaPart())
+                                    primaryKeyFields, componentProvider.getTransactionSubsystemProvider(),
+                                    Operation.get(op), index.resourceType())
                             : NoOpOperationCallbackFactory.INSTANCE;
         } else {
             return op == IndexOperation.DELETE || op == IndexOperation.INSERT || op == IndexOperation.UPSERT
                     ? new SecondaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(), primaryKeyFields,
-                            componentProvider.getTransactionSubsystemProvider(), op, index.resourceType(),
-                            hasMetaPart())
+                            componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
+                            index.resourceType())
                     : NoOpOperationCallbackFactory.INSTANCE;
         }
     }
@@ -574,10 +578,10 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
     }
 
     public IPushRuntimeFactory getCommitRuntimeFactory(JobId jobId, int[] primaryKeyFields,
-            MetadataProvider metadataProvider, int upsertVarIdx, int[] datasetPartitions, boolean isSink) {
+            MetadataProvider metadataProvider, int[] datasetPartitions, boolean isSink) {
         return new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
-                metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), upsertVarIdx,
-                datasetPartitions, isSink);
+                metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), datasetPartitions,
+                isSink);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index f02496a..dd40b04 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -462,7 +462,6 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
                 switch (remoteLog.getLogType()) {
                     case LogType.UPDATE:
                     case LogType.ENTITY_COMMIT:
-                    case LogType.UPSERT_ENTITY_COMMIT:
                         //if the log partition belongs to a partitions hosted on this node, replicate it
                         if (nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
                             logManager.log(remoteLog);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
index 7ec5691..b182add 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -55,7 +55,7 @@ public class JobEventListenerFactory implements IJobletEventListenerFactory {
                             .getApplicationContext()).getTransactionSubsystem().getTransactionManager();
                     ITransactionContext txnContext = txnManager.getTransactionContext(jobId, false);
                     txnContext.setWriteTxn(transactionalWrite);
-                    txnManager.completedTransaction(txnContext, new DatasetId(-1), -1,
+                    txnManager.completedTransaction(txnContext, DatasetId.NULL, -1,
                             !(jobStatus == JobStatus.FAILURE));
                 } catch (ACIDException e) {
                     throw new Error(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
index ac1895b..c194d64 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
@@ -59,7 +59,7 @@ public class MultiTransactionJobletEventListenerFactory implements IJobletEventL
                     for (JobId jobId : jobIds) {
                         ITransactionContext txnContext = txnManager.getTransactionContext(jobId, false);
                         txnContext.setWriteTxn(transactionalWrite);
-                        txnManager.completedTransaction(txnContext, new DatasetId(-1), -1,
+                        txnManager.completedTransaction(txnContext, DatasetId.NULL, -1,
                                 !(jobStatus == JobStatus.FAILURE));
                     }
                 } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/04075533/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 704ee52..1f18c97 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -31,6 +31,8 @@ import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
+import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallback;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -49,7 +51,6 @@ import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
 import org.apache.hyracks.storage.am.common.api.IIndexCursor;
-import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback.Operation;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.common.api.IndexException;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
@@ -88,6 +89,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
     private LockThenSearchOperationCallback searchCallback;
     private IFrameOperationCallback frameOpCallback;
     private final IFrameOperationCallbackFactory frameOpCallbackFactory;
+    private AbstractIndexModificationOperationCallback abstractModCallback;
 
     public LSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
             int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys,
@@ -149,9 +151,11 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
             appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
             modCallback = opDesc.getModificationOpCallbackFactory()
                     .createModificationOperationCallback(indexHelper.getResource(), ctx, this);
+            abstractModCallback = (AbstractIndexModificationOperationCallback) modCallback;
             searchCallback = (LockThenSearchOperationCallback) opDesc.getSearchOpCallbackFactory()
                     .createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
-            indexAccessor = index.createAccessor(modCallback, searchCallback);
+            indexAccessor = index.createAccessor(abstractModCallback, searchCallback);
+
             cursor = indexAccessor.createSearchCursor(false);
             frameTuple = new FrameTupleReference();
             IAppRuntimeContext appCtx =
@@ -172,12 +176,12 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
     }
 
     private void writeOutput(int tupleIndex, boolean recordWasInserted, boolean recordWasDeleted) throws IOException {
-        frameTuple.reset(accessor, tupleIndex);
-        for (int i = 0; i < frameTuple.getFieldCount(); i++) {
-            dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
-            tb.addFieldEndOffset();
-        }
         if (recordWasInserted || recordWasDeleted) {
+            frameTuple.reset(accessor, tupleIndex);
+            for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+                dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+                tb.addFieldEndOffset();
+            }
             FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
         } else {
             try {
@@ -233,12 +237,15 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
                                 prevTuple.getFieldLength(filterFieldIndex));
                         tb.addFieldEndOffset();
                     }
-                    modCallback.setOp(Operation.DELETE);
-                    if (firstModification) {
-                        lsmAccessor.delete(prevTuple);
-                        firstModification = false;
-                    } else {
-                        lsmAccessor.forceDelete(prevTuple);
+                    if (isNull(tuple, numOfPrimaryKeys)) {
+                        // Only delete if it is a delete and not upsert
+                        abstractModCallback.setOp(Operation.DELETE);
+                        if (firstModification) {
+                            lsmAccessor.delete(prevTuple);
+                            firstModification = false;
+                        } else {
+                            lsmAccessor.forceDelete(prevTuple);
+                        }
                     }
                 } else {
                     prevTuple = null;
@@ -253,12 +260,12 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
                     cursor.reset();
                 }
                 if (!isNull(tuple, numOfPrimaryKeys)) {
-                    modCallback.setOp(Operation.INSERT);
+                    abstractModCallback.setOp(Operation.UPSERT);
                     if (firstModification) {
-                        lsmAccessor.insert(tuple);
+                        lsmAccessor.upsert(tuple);
                         firstModification = false;
                     } else {
-                        lsmAccessor.forceInsert(tuple);
+                        lsmAccessor.forceUpsert(tuple);
                     }
                     recordWasInserted = true;
                 }


Mime
View raw message