asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject asterixdb git commit: Enable commit runtime extension
Date Sun, 05 Feb 2017 17:26:47 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master 56189fb11 -> 910303b46


Enable commit runtime extension

Change-Id: I98083ea5e93cb5f45d92c5dfbacfee1020fad57a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1485
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/910303b4
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/910303b4
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/910303b4

Branch: refs/heads/master
Commit: 910303b4634f7ced6bde7dc2e6525db880975062
Parents: 56189fb
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Authored: Fri Feb 3 23:02:25 2017 -0800
Committer: abdullah alamoudi <bamousaa@gmail.com>
Committed: Sun Feb 5 09:24:38 2017 -0800

----------------------------------------------------------------------
 asterixdb/asterix-algebra/pom.xml               |   4 -
 .../operators/physical/CommitPOperator.java     |  30 ++-
 .../operators/physical/CommitRuntime.java       | 187 ------------------
 .../physical/CommitRuntimeFactory.java          |  69 -------
 .../operators/physical/UpsertCommitRuntime.java |  54 ------
 .../rules/SetupCommitExtensionOpRule.java       |  34 ++--
 .../app/bootstrap/TestNodeController.java       |   2 +-
 .../external/library/java/JObjectAccessors.java |  27 ++-
 .../asterix/metadata/entities/Dataset.java      |  25 +--
 .../printers/adm/ABooleanPrinterFactory.java    |   4 +-
 .../adm/ShortWithoutTypeInfoPrinterFactory.java |   4 +-
 .../printers/csv/ABooleanPrinterFactory.java    |   4 +-
 .../json/clean/ABooleanPrinterFactory.java      |   4 +-
 .../json/lossless/ABooleanPrinterFactory.java   |   4 +-
 .../serde/ABooleanSerializerDeserializer.java   |   8 +-
 .../evaluators/common/GramTokensEvaluator.java  |   4 +-
 .../evaluators/functions/AndDescriptor.java     |  14 +-
 ...EditDistanceStringIsFilterableEvaluator.java |  18 +-
 .../evaluators/functions/NotDescriptor.java     |   8 +-
 .../evaluators/functions/OrDescriptor.java      |  14 +-
 asterixdb/asterix-transactions/pom.xml          |  21 ++-
 .../management/runtime/CommitRuntime.java       | 188 +++++++++++++++++++
 .../runtime/CommitRuntimeFactory.java           |  69 +++++++
 .../management/runtime/UpsertCommitRuntime.java |  54 ++++++
 .../data/std/primitive/BooleanPointable.java    |   2 +-
 25 files changed, 416 insertions(+), 436 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-algebra/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/pom.xml b/asterixdb/asterix-algebra/pom.xml
index 8588381..5a723b3 100644
--- a/asterixdb/asterix-algebra/pom.xml
+++ b/asterixdb/asterix-algebra/pom.xml
@@ -226,10 +226,6 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-storage-am-bloomfilter</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hyracks</groupId>
       <artifactId>algebricks-runtime</artifactId>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
index 1c01c40..d0cee55 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -36,6 +37,7 @@ import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalProperties
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.io.FileSplit;
 
@@ -43,20 +45,16 @@ public class CommitPOperator extends AbstractPhysicalOperator {
 
     private final List<LogicalVariable> primaryKeyLogicalVars;
     private final JobId jobId;
-    private final int datasetId;
-    private final String dataverse;
-    private final String dataset;
+    private final Dataset dataset;
     private final LogicalVariable upsertVar;
     private final boolean isSink;
 
-    public CommitPOperator(JobId jobId, String dataverse, String dataset, int datasetId,
-            List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar, boolean isSink) {
+    public CommitPOperator(JobId jobId, Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars,
+            LogicalVariable upsertVar, boolean isSink) {
         this.jobId = jobId;
-        this.datasetId = datasetId;
+        this.dataset = dataset;
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
         this.upsertVar = upsertVar;
-        this.dataverse = dataverse;
-        this.dataset = dataset;
         this.isSink = isSink;
     }
 
@@ -86,28 +84,26 @@ public class CommitPOperator extends AbstractPhysicalOperator {
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
-        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
-                context);
+        RecordDescriptor recDesc =
+                JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
         int[] primaryKeyFields = JobGenHelper.variablesToFieldIndexes(primaryKeyLogicalVars, inputSchemas[0]);
 
         //get dataset splits
         FileSplit[] splitsForDataset = metadataProvider.splitsForDataset(metadataProvider.getMetadataTxnContext(),
-                dataverse, dataset, dataset, metadataProvider.isTemporaryDatasetWriteJob());
+                dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName(),
+                metadataProvider.isTemporaryDatasetWriteJob());
         int[] datasetPartitions = new int[splitsForDataset.length];
         for (int i = 0; i < splitsForDataset.length; i++) {
             datasetPartitions[i] = i;
         }
-
         int upsertVarIdx = -1;
-        CommitRuntimeFactory runtime = null;
         if (upsertVar != null) {
             upsertVarIdx = inputSchemas[0].findVariable(upsertVar);
         }
-        runtime = new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
-                metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), upsertVarIdx,
-                datasetPartitions, isSink);
+        IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(jobId, primaryKeyFields, metadataProvider,
+                upsertVarIdx, datasetPartitions, isSink);
         builder.contributeMicroOperator(op, runtime, recDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
deleted file mode 100644
index 63a91ac..0000000
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.algebra.operators.physical;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.ILogManager;
-import org.apache.asterix.common.transactions.ILogMarkerCallback;
-import org.apache.asterix.common.transactions.ITransactionContext;
-import org.apache.asterix.common.transactions.ITransactionManager;
-import org.apache.asterix.common.transactions.JobId;
-import org.apache.asterix.common.transactions.LogRecord;
-import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.common.utils.TransactionUtil;
-import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.HyracksConstants;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
-import org.apache.hyracks.dataflow.common.utils.TaskUtil;
-import org.apache.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
-
-public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
-
-    private final static long SEED = 0L;
-
-    protected final ITransactionManager transactionManager;
-    protected final ILogManager logMgr;
-    protected final JobId jobId;
-    protected final int datasetId;
-    protected final int[] primaryKeyFields;
-    protected final boolean isTemporaryDatasetWriteJob;
-    protected final boolean isWriteTransaction;
-    protected final long[] longHashes;
-    protected final IHyracksTaskContext ctx;
-    protected final int resourcePartition;
-    protected ITransactionContext transactionContext;
-    protected LogRecord logRecord;
-    protected final boolean isSink;
-
-    public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
-            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, boolean isSink) {
-        this.ctx = ctx;
-        IAppRuntimeContext runtimeCtx =
-                (IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
-        this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
-        this.logMgr = runtimeCtx.getTransactionSubsystem().getLogManager();
-        this.jobId = jobId;
-        this.datasetId = datasetId;
-        this.primaryKeyFields = primaryKeyFields;
-        this.tRef = new FrameTupleReference();
-        this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob;
-        this.isWriteTransaction = isWriteTransaction;
-        this.resourcePartition = resourcePartition;
-        this.isSink = isSink;
-        longHashes = new long[2];
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        try {
-            transactionContext = transactionManager.getTransactionContext(jobId, false);
-            transactionContext.setWriteTxn(isWriteTransaction);
-            ILogMarkerCallback callback =
-                    TaskUtil.<ILogMarkerCallback> get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
-            logRecord = new LogRecord(callback);
-            if (isSink) {
-                return;
-            }
-            initAccessAppend(ctx);
-            writer.open();
-        } catch (ACIDException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        tAccess.reset(buffer);
-        int nTuple = tAccess.getTupleCount();
-        for (int t = 0; t < nTuple; t++) {
-            if (isTemporaryDatasetWriteJob) {
-                /**
-                 * This "if branch" is for writes over temporary datasets. A temporary dataset does not require any lock
-                 * and does not generate any write-ahead update and commit log but generates flush log and job commit
-                 * log. However, a temporary dataset still MUST guarantee no-steal policy so that this notification call
-                 * should be delivered to PrimaryIndexOptracker and used correctly in order to decrement number of
-                 * active operation count of PrimaryIndexOptracker. By maintaining the count correctly and only allowing
-                 * flushing when the count is 0, it can guarantee the no-steal policy for temporary datasets, too.
-                 */
-                transactionContext.notifyOptracker(false);
-            } else {
-                tRef.reset(tAccess, t);
-                try {
-                    formLogRecord(buffer, t);
-                    logMgr.log(logRecord);
-                    if (!isSink) {
-                        appendTupleToFrame(t);
-                    }
-                } catch (ACIDException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-        }
-        VSizeFrame message = TaskUtil.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
-        if (message != null
-                && MessagingFrameTupleAppender.getMessageType(message) == MessagingFrameTupleAppender.MARKER_MESSAGE) {
-            try {
-                formMarkerLogRecords(message.getBuffer());
-                logMgr.log(logRecord);
-            } catch (ACIDException e) {
-                throw new HyracksDataException(e);
-            }
-            message.reset();
-            message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
-            message.getBuffer().flip();
-        }
-    }
-
-    private void formMarkerLogRecords(ByteBuffer marker) {
-        TransactionUtil.formMarkerLogRecord(logRecord, transactionContext, datasetId, resourcePartition, marker);
-    }
-
-    protected void formLogRecord(ByteBuffer buffer, int t) {
-        int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields);
-        TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, tRef,
-                primaryKeyFields, resourcePartition, LogType.ENTITY_COMMIT);
-    }
-
-    protected int computePrimaryKeyHashValue(ITupleReference tuple, int[] primaryKeyFields) {
-        MurmurHash128Bit.hash3_x64_128(tuple, primaryKeyFields, SEED, longHashes);
-        return Math.abs((int) longHashes[0]);
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        failed = true;
-        if (isSink) {
-            return;
-        }
-        writer.fail();
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        if (isSink) {
-            return;
-        }
-        flushIfNotFailed();
-        writer.close();
-        appender.reset(frame, true);
-    }
-
-    @Override
-    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
-        this.inputRecordDesc = recordDescriptor;
-        this.tAccess = new FrameTupleAccessor(inputRecordDesc);
-    }
-
-    @Override
-    public void flush() throws HyracksDataException {
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
deleted file mode 100644
index 767d864..0000000
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.algebra.operators.physical;
-
-import org.apache.asterix.common.transactions.JobId;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class CommitRuntimeFactory implements IPushRuntimeFactory {
-
-    private static final long serialVersionUID = 1L;
-
-    private final JobId jobId;
-    private final int datasetId;
-    private final int[] primaryKeyFields;
-    private final boolean isTemporaryDatasetWriteJob;
-    private final boolean isWriteTransaction;
-    private final int upsertVarIdx;
-    private int[] datasetPartitions;
-    private final boolean isSink;
-
-    public CommitRuntimeFactory(JobId jobId, int datasetId, int[] primaryKeyFields, boolean isTemporaryDatasetWriteJob,
-            boolean isWriteTransaction, int upsertVarIdx, int[] datasetPartitions, boolean isSink) {
-        this.jobId = jobId;
-        this.datasetId = datasetId;
-        this.primaryKeyFields = primaryKeyFields;
-        this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob;
-        this.isWriteTransaction = isWriteTransaction;
-        this.upsertVarIdx = upsertVarIdx;
-        this.datasetPartitions = datasetPartitions;
-        this.isSink = isSink;
-    }
-
-    @Override
-    public String toString() {
-        return "commit";
-    }
-
-    @Override
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
-        if (upsertVarIdx >= 0) {
-            return new UpsertCommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob,
-                    isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()],
-                    upsertVarIdx, isSink);
-        } else {
-            return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob,
-                    isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
deleted file mode 100644
index 53e0f62..0000000
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.algebra.operators.physical;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.common.transactions.JobId;
-import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.common.utils.TransactionUtil;
-import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class UpsertCommitRuntime extends CommitRuntime {
-    private final int upsertIdx;
-
-    public UpsertCommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
-            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, int upsertIdx,
-            boolean isSink) {
-        super(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob, isWriteTransaction,
-                resourcePartition, isSink);
-        this.upsertIdx = upsertIdx;
-    }
-
-    @Override
-    protected void formLogRecord(ByteBuffer buffer, int t) {
-        boolean isNull = ABooleanSerializerDeserializer.getBoolean(buffer.array(), tAccess.getFieldSlotsLength()
-                + tAccess.getTupleStartOffset(t) + tAccess.getFieldStartOffset(t, upsertIdx) + 1);
-        if (isNull) {
-            // Previous record not found (insert)
-            super.formLogRecord(buffer, t);
-        } else {
-            // Previous record found (delete + insert)
-            int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields);
-            TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, tRef,
-                    primaryKeyFields, resourcePartition, LogType.UPSERT_ENTITY_COMMIT);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
index 5bafe83..9b442ae 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
@@ -27,6 +27,7 @@ import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -70,9 +71,7 @@ public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule {
         boolean isSink = ((CommitOperator) eOp.getDelegate()).isSink();
 
         List<Mutable<ILogicalExpression>> primaryKeyExprs = null;
-        int datasetId = 0;
-        String dataverse = null;
-        String datasetName = null;
+        Dataset dataset = null;
         AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
         LogicalVariable upsertVar = null;
         while (descendantOp != null) {
@@ -80,29 +79,19 @@ public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule {
                 IndexInsertDeleteUpsertOperator operator = (IndexInsertDeleteUpsertOperator) descendantOp;
                 if (!operator.isBulkload() && operator.getPrevSecondaryKeyExprs() == null) {
                     primaryKeyExprs = operator.getPrimaryKeyExpressions();
-                    datasetId = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset()
-                            .getDatasetId();
-                    dataverse = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset()
-                            .getDataverseName();
-                    datasetName = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset()
-                            .getDatasetName();
+                    dataset = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset();
                     break;
                 }
             } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE_UPSERT) {
                 InsertDeleteUpsertOperator insertDeleteUpsertOperator = (InsertDeleteUpsertOperator) descendantOp;
                 if (!insertDeleteUpsertOperator.isBulkload()) {
                     primaryKeyExprs = insertDeleteUpsertOperator.getPrimaryKeyExpressions();
-                    datasetId = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
-                            .getDatasetId();
-                    dataverse = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
-                            .getDataverseName();
-                    datasetName = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
-                            .getDatasetName();
+                    dataset = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset();
                     if (insertDeleteUpsertOperator.getOperation() == Kind.UPSERT) {
                         //we need to add a function that checks if previous record was found
                         upsertVar = context.newVar();
-                        AbstractFunctionCallExpression orFunc = new ScalarFunctionCallExpression(
-                                FunctionUtil.getFunctionInfo(BuiltinFunctions.OR));
+                        AbstractFunctionCallExpression orFunc =
+                                new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.OR));
                         // is new value missing? -> this means that the expected operation is delete
                         AbstractFunctionCallExpression isNewMissingFunc = new ScalarFunctionCallExpression(
                                 FunctionUtil.getFunctionInfo(BuiltinFunctions.IS_MISSING));
@@ -116,11 +105,10 @@ public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule {
                         orFunc.getArguments().add(new MutableObject<ILogicalExpression>(isNewMissingFunc));
 
                         // AssignOperator puts in the cast var the casted record
-                        AssignOperator upsertFlagAssign = new AssignOperator(upsertVar,
-                                new MutableObject<ILogicalExpression>(orFunc));
+                        AssignOperator upsertFlagAssign =
+                                new AssignOperator(upsertVar, new MutableObject<ILogicalExpression>(orFunc));
                         // Connect the current top of the plan to the cast operator
-                        upsertFlagAssign.getInputs()
-                                .add(new MutableObject<ILogicalOperator>(eOp.getInputs().get(0).getValue()));
+                        upsertFlagAssign.getInputs().add(new MutableObject<>(eOp.getInputs().get(0).getValue()));
                         eOp.getInputs().clear();
                         eOp.getInputs().add(new MutableObject<ILogicalOperator>(upsertFlagAssign));
                         context.computeAndSetTypeEnvironmentForOperator(upsertFlagAssign);
@@ -151,8 +139,8 @@ public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule {
 
         //create the logical and physical operator
         CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, upsertVar, isSink);
-        CommitPOperator commitPOperator = new CommitPOperator(jobId, dataverse, datasetName, datasetId,
-                primaryKeyLogicalVars, upsertVar, isSink);
+        CommitPOperator commitPOperator =
+                new CommitPOperator(jobId, dataset, primaryKeyLogicalVars, upsertVar, isSink);
         commitOperator.setPhysicalOperator(commitPOperator);
 
         //create ExtensionOperator and put the commitOperator in it.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index bc18045..cc12f36 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.logging.Logger;
 
-import org.apache.asterix.algebra.operators.physical.CommitRuntime;
 import org.apache.asterix.app.external.TestLibrarian;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.app.nc.TransactionSubsystem;
@@ -57,6 +56,7 @@ import org.apache.asterix.test.runtime.ExecutionTestUtil;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
 import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadataFactory;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import org.apache.asterix.transaction.management.runtime.CommitRuntime;
 import org.apache.asterix.transaction.management.service.logging.LogReader;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
index 1f1c139..f8755b4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
@@ -18,6 +18,12 @@
  */
 package org.apache.asterix.external.library.java;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.List;
+
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -87,12 +93,6 @@ import org.apache.asterix.om.util.container.IObjectPool;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.util.string.UTF8StringReader;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.LinkedHashMap;
-import java.util.List;
-
 public class JObjectAccessors {
 
     public static IJObjectAccessor createFlatJObjectAccessor(ATypeTag aTypeTag) {
@@ -455,7 +455,7 @@ public class JObjectAccessors {
             this.typeInfo = new TypeInfo(objectPool, null, null);
             this.jObjects = new IJObject[recordType.getFieldNames().length];
             this.jRecord = new JRecord(recordType, jObjects);
-            this.openFields = new LinkedHashMap<String, IJObject>();
+            this.openFields = new LinkedHashMap<>();
         }
 
         @Override
@@ -473,12 +473,11 @@ public class JObjectAccessors {
                 for (IVisitablePointable fieldPointable : fieldPointables) {
                     closedPart = index < recordType.getFieldTypes().length;
                     IVisitablePointable tt = fieldTypeTags.get(index);
-                    ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
-                            .deserialize(tt.getByteArray()[tt.getStartOffset()]);
+                    ATypeTag typeTag =
+                            EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(tt.getByteArray()[tt.getStartOffset()]);
                     IAType fieldType;
-                    fieldType = closedPart ?
-                            recordType.getFieldTypes()[index] :
-                            TypeTagUtil.getBuiltinTypeByTag(typeTag);
+                    fieldType =
+                            closedPart ? recordType.getFieldTypes()[index] : TypeTagUtil.getBuiltinTypeByTag(typeTag);
                     IVisitablePointable fieldName = fieldNames.get(index);
                     typeInfo.reset(fieldType, typeTag);
                     switch (typeTag) {
@@ -491,8 +490,8 @@ public class JObjectAccessors {
                                 // value is null
                                 fieldObject = null;
                             } else {
-                                fieldObject = pointableVisitor
-                                        .visit((AListVisitablePointable) fieldPointable, typeInfo);
+                                fieldObject =
+                                        pointableVisitor.visit((AListVisitablePointable) fieldPointable, typeInfo);
                             }
                             break;
                         case ANY:

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 4ebf055..55cd304 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -22,6 +22,7 @@ package org.apache.asterix.metadata.entities;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -66,11 +67,13 @@ import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOpera
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
+import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
@@ -241,13 +244,8 @@ public class Dataset implements IMetadataEntity<Dataset> {
             return false;
         }
         Dataset otherDataset = (Dataset) other;
-        if (!otherDataset.dataverseName.equals(dataverseName)) {
-            return false;
-        }
-        if (!otherDataset.datasetName.equals(datasetName)) {
-            return false;
-        }
-        return true;
+        return Objects.equals(dataverseName, otherDataset.dataverseName)
+                && Objects.equals(datasetName, otherDataset.datasetName);
     }
 
     public boolean allow(ILogicalOperator topOp, byte operation) {//NOSONAR: this method is meant to be extended
@@ -567,10 +565,13 @@ public class Dataset implements IMetadataEntity<Dataset> {
 
     @Override
     public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((datasetName == null) ? 0 : datasetName.hashCode());
-        result = prime * result + ((dataverseName == null) ? 0 : dataverseName.hashCode());
-        return result;
+        return Objects.hash(dataverseName, datasetName);
+    }
+
+    public IPushRuntimeFactory getCommitRuntimeFactory(JobId jobId, int[] primaryKeyFields,
+            MetadataProvider metadataProvider, int upsertVarIdx, int[] datasetPartitions, boolean isSink) {
+        return new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
+                metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), upsertVarIdx,
+                datasetPartitions, isSink);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ABooleanPrinterFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ABooleanPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ABooleanPrinterFactory.java
index 511ea9f..0a2f166 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ABooleanPrinterFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ABooleanPrinterFactory.java
@@ -29,8 +29,8 @@ public class ABooleanPrinterFactory implements IPrinterFactory {
     private static final long serialVersionUID = 1L;
     public static final ABooleanPrinterFactory INSTANCE = new ABooleanPrinterFactory();
 
-    public static final IPrinter PRINTER = (byte[] b, int s, int l, PrintStream ps) -> ps
-            .print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
+    public static final IPrinter PRINTER =
+            (byte[] b, int s, int l, PrintStream ps) -> ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
 
     @Override
     public IPrinter createPrinter() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ShortWithoutTypeInfoPrinterFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ShortWithoutTypeInfoPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ShortWithoutTypeInfoPrinterFactory.java
index 666fa0a..2a878ac 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ShortWithoutTypeInfoPrinterFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ShortWithoutTypeInfoPrinterFactory.java
@@ -29,8 +29,8 @@ public class ShortWithoutTypeInfoPrinterFactory implements IPrinterFactory {
     private static final long serialVersionUID = 1L;
     public static final ShortWithoutTypeInfoPrinterFactory INSTANCE = new ShortWithoutTypeInfoPrinterFactory();
 
-    public static final IPrinter PRINTER = (byte[] b, int s, int l, PrintStream ps) -> ps
-            .print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
+    public static final IPrinter PRINTER =
+            (byte[] b, int s, int l, PrintStream ps) -> ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
 
     @Override
     public IPrinter createPrinter() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ABooleanPrinterFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ABooleanPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ABooleanPrinterFactory.java
index 4aa6ccd..c500e86 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ABooleanPrinterFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ABooleanPrinterFactory.java
@@ -29,8 +29,8 @@ public class ABooleanPrinterFactory implements IPrinterFactory {
     private static final long serialVersionUID = 1L;
     public static final ABooleanPrinterFactory INSTANCE = new ABooleanPrinterFactory();
 
-    public static final IPrinter PRINTER = (byte[] b, int s, int l, PrintStream ps) -> ps
-            .print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
+    public static final IPrinter PRINTER =
+            (byte[] b, int s, int l, PrintStream ps) -> ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
 
     @Override
     public IPrinter createPrinter() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/ABooleanPrinterFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/ABooleanPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/ABooleanPrinterFactory.java
index ebc09a0..aa6fcbe 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/ABooleanPrinterFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/ABooleanPrinterFactory.java
@@ -29,8 +29,8 @@ public class ABooleanPrinterFactory implements IPrinterFactory {
     private static final long serialVersionUID = 1L;
     public static final ABooleanPrinterFactory INSTANCE = new ABooleanPrinterFactory();
 
-    public static final IPrinter PRINTER = (byte[] b, int s, int l, PrintStream ps) -> ps
-            .print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
+    public static final IPrinter PRINTER =
+            (byte[] b, int s, int l, PrintStream ps) -> ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
 
     @Override
     public IPrinter createPrinter() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/lossless/ABooleanPrinterFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/lossless/ABooleanPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/lossless/ABooleanPrinterFactory.java
index b65897b..959c4ad 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/lossless/ABooleanPrinterFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/lossless/ABooleanPrinterFactory.java
@@ -29,8 +29,8 @@ public class ABooleanPrinterFactory implements IPrinterFactory {
     private static final long serialVersionUID = 1L;
     public static final ABooleanPrinterFactory INSTANCE = new ABooleanPrinterFactory();
 
-    public static final IPrinter PRINTER = (byte[] b, int s, int l, PrintStream ps) ->
-        ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
+    public static final IPrinter PRINTER =
+            (byte[] b, int s, int l, PrintStream ps) -> ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
 
     @Override
     public IPrinter createPrinter() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ABooleanSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ABooleanSerializerDeserializer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ABooleanSerializerDeserializer.java
index 227c2cd..7d6a078 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ABooleanSerializerDeserializer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ABooleanSerializerDeserializer.java
@@ -29,7 +29,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 public class ABooleanSerializerDeserializer implements ISerializerDeserializer<ABoolean> {
 
     private static final long serialVersionUID = 1L;
-
     public static final ABooleanSerializerDeserializer INSTANCE = new ABooleanSerializerDeserializer();
 
     private ABooleanSerializerDeserializer() {
@@ -54,11 +53,6 @@ public class ABooleanSerializerDeserializer implements ISerializerDeserializer<A
     }
 
     public static boolean getBoolean(byte[] bytes, int offset) {
-        if (bytes[offset] == 0) {
-            return false;
-        } else {
-            return true;
-        }
+        return bytes[offset] != 0;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/GramTokensEvaluator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/GramTokensEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/GramTokensEvaluator.java
index d9cfc67..ef727c9 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/GramTokensEvaluator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/GramTokensEvaluator.java
@@ -22,6 +22,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.asterix.builders.OrderedListBuilder;
+import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.types.AOrderedListType;
 import org.apache.asterix.om.types.BuiltinType;
@@ -31,7 +32,6 @@ import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.BooleanPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -75,7 +75,7 @@ public class GramTokensEvaluator implements IScalarEvaluator {
         int gramLength = ATypeHierarchy.getIntegerValue(BuiltinFunctions.GRAM_TOKENS.getName(), 1,
                 gramLengthArg.getByteArray(), gramLengthArg.getStartOffset());
         tokenizer.setGramlength(gramLength);
-        boolean prePost = BooleanPointable.getBoolean(prePostArg.getByteArray(),
+        boolean prePost = ABooleanSerializerDeserializer.getBoolean(prePostArg.getByteArray(),
                 prePostArg.getStartOffset() + typeIndicatorSize);
         tokenizer.setPrePost(prePost);
         tokenizer.reset(stringArg.getByteArray(), stringArg.getStartOffset(), stringArg.getLength());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AndDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AndDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AndDescriptor.java
index 9fd5dc4..e9f9c9e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AndDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AndDescriptor.java
@@ -74,14 +74,14 @@ public class AndDescriptor extends AbstractScalarFunctionDynamicDescriptor {
 
                 return new IScalarEvaluator() {
                     @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ABoolean> booleanSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+                    private ISerializerDeserializer<ABoolean> booleanSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABOOLEAN);
                     @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
+                    private ISerializerDeserializer<ANull> nullSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
                     @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AMissing> missingSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AMISSING);
+                    private ISerializerDeserializer<AMissing> missingSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING);
 
                     @Override
                     public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
@@ -111,7 +111,7 @@ public class AndDescriptor extends AbstractScalarFunctionDynamicDescriptor {
                                         ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
                             }
                             boolean argResult = ABooleanSerializerDeserializer.getBoolean(bytes, offset + 1);
-                            if (! argResult) {
+                            if (!argResult) {
                                 // anything AND FALSE = FALSE
                                 booleanSerde.serialize(ABoolean.FALSE, out);
                                 result.set(resultStorage);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/EditDistanceStringIsFilterableEvaluator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/EditDistanceStringIsFilterableEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/EditDistanceStringIsFilterableEvaluator.java
index f6c8c4f..0509f51 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/EditDistanceStringIsFilterableEvaluator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/EditDistanceStringIsFilterableEvaluator.java
@@ -22,6 +22,7 @@ package org.apache.asterix.runtime.evaluators.functions;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.om.base.ABoolean;
 import org.apache.asterix.om.functions.BuiltinFunctions;
@@ -35,7 +36,6 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.BooleanPointable;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
@@ -56,8 +56,8 @@ public class EditDistanceStringIsFilterableEvaluator implements IScalarEvaluator
     protected final IScalarEvaluator usePrePostEval;
 
     @SuppressWarnings("unchecked")
-    private final ISerializerDeserializer<ABoolean> booleanSerde = SerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+    private final ISerializerDeserializer<ABoolean> booleanSerde =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABOOLEAN);
 
     private final UTF8StringPointable utf8Ptr = new UTF8StringPointable();
 
@@ -88,14 +88,12 @@ public class EditDistanceStringIsFilterableEvaluator implements IScalarEvaluator
         int strLen = utf8Ptr.getStringLength();
 
         // Check type and extract edit-distance threshold.
-        long edThresh = ATypeHierarchy.getIntegerValue(
-                BuiltinFunctions.EDIT_DISTANCE_LIST_IS_FILTERABLE.getName(), 1, edThreshPtr.getByteArray(),
-                edThreshPtr.getStartOffset());
+        long edThresh = ATypeHierarchy.getIntegerValue(BuiltinFunctions.EDIT_DISTANCE_LIST_IS_FILTERABLE.getName(), 1,
+                edThreshPtr.getByteArray(), edThreshPtr.getStartOffset());
 
         // Check type and extract gram length.
-        long gramLen = ATypeHierarchy.getIntegerValue(
-                BuiltinFunctions.EDIT_DISTANCE_LIST_IS_FILTERABLE.getName(), 2, gramLenPtr.getByteArray(),
-                gramLenPtr.getStartOffset());
+        long gramLen = ATypeHierarchy.getIntegerValue(BuiltinFunctions.EDIT_DISTANCE_LIST_IS_FILTERABLE.getName(), 2,
+                gramLenPtr.getByteArray(), gramLenPtr.getStartOffset());
 
         // Check type and extract usePrePost flag.
         typeTag = usePrePostPtr.getByteArray()[usePrePostPtr.getStartOffset()];
@@ -103,7 +101,7 @@ public class EditDistanceStringIsFilterableEvaluator implements IScalarEvaluator
             throw new TypeMismatchException(BuiltinFunctions.EDIT_DISTANCE_STRING_IS_FILTERABLE, 3, typeTag,
                     ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
         }
-        boolean usePrePost = BooleanPointable.getBoolean(usePrePostPtr.getByteArray(),
+        boolean usePrePost = ABooleanSerializerDeserializer.getBoolean(usePrePostPtr.getByteArray(),
                 usePrePostPtr.getStartOffset() + 1);
 
         // Compute result.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/NotDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/NotDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/NotDescriptor.java
index 32263ea..13037a9 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/NotDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/NotDescriptor.java
@@ -71,8 +71,8 @@ public class NotDescriptor extends AbstractScalarFunctionDynamicDescriptor {
                     private IScalarEvaluator eval = args[0].createScalarEvaluator(ctx);
 
                     @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ABoolean> booleanSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+                    private ISerializerDeserializer<ABoolean> booleanSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABOOLEAN);
 
                     @Override
                     public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
@@ -86,8 +86,8 @@ public class NotDescriptor extends AbstractScalarFunctionDynamicDescriptor {
                             ABoolean aResult = argRes ? ABoolean.FALSE : ABoolean.TRUE;
                             booleanSerde.serialize(aResult, out);
                         } else {
-                            throw new TypeMismatchException(getIdentifier(), 0,
-                                    bytes[offset], ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
+                            throw new TypeMismatchException(getIdentifier(), 0, bytes[offset],
+                                    ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
                         }
                         result.set(resultStorage);
                     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/OrDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/OrDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/OrDescriptor.java
index c7a608a..7aea25c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/OrDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/OrDescriptor.java
@@ -74,14 +74,14 @@ public class OrDescriptor extends AbstractScalarFunctionDynamicDescriptor {
                     private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
                     private DataOutput output = resultStorage.getDataOutput();
                     @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ABoolean> booleanSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+                    private ISerializerDeserializer<ABoolean> booleanSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABOOLEAN);
                     @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
+                    private ISerializerDeserializer<ANull> nullSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
                     @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AMissing> missingSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AMISSING);
+                    private ISerializerDeserializer<AMissing> missingSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING);
 
                     @Override
                     public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
@@ -107,7 +107,7 @@ public class OrDescriptor extends AbstractScalarFunctionDynamicDescriptor {
                                         ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
                             }
                             boolean argResult = ABooleanSerializerDeserializer.getBoolean(data, offset + 1);
-                            if (argResult == true) {
+                            if (argResult) {
                                 // anything OR TRUE = TRUE
                                 booleanSerde.serialize(ABoolean.TRUE, output);
                                 result.set(resultStorage);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-transactions/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/pom.xml b/asterixdb/asterix-transactions/pom.xml
index 794595f..a65a436 100644
--- a/asterixdb/asterix-transactions/pom.xml
+++ b/asterixdb/asterix-transactions/pom.xml
@@ -24,7 +24,6 @@
     <version>0.9.1-SNAPSHOT</version>
   </parent>
   <artifactId>asterix-transactions</artifactId>
-
   <licenses>
     <license>
       <name>Apache License, Version 2.0</name>
@@ -33,11 +32,9 @@
       <comments>A business-friendly OSS license</comments>
     </license>
   </licenses>
-
   <properties>
     <appendedResourcesDirectory>${basedir}/../src/main/appended-resources</appendedResourcesDirectory>
   </properties>
-
   <build>
     <plugins>
       <plugin>
@@ -83,9 +80,7 @@
         </executions>
       </plugin>
     </plugins>
-
   </build>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
@@ -111,6 +106,11 @@
       <scope>compile</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-om</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
@@ -149,6 +149,13 @@
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-storage-common</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-storage-am-bloomfilter</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>algebricks-runtime</artifactId>
+    </dependency>
   </dependencies>
-
-</project>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
new file mode 100644
index 0000000..d38c5b7
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.transaction.management.runtime;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.common.transactions.ILogMarkerCallback;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.common.utils.TransactionUtil;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.utils.TaskUtil;
+import org.apache.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
+
+public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+
+    private final static long SEED = 0L;
+
+    protected final ITransactionManager transactionManager;
+    protected final ILogManager logMgr;
+    protected final JobId jobId;
+    protected final int datasetId;
+    protected final int[] primaryKeyFields;
+    protected final boolean isTemporaryDatasetWriteJob;
+    protected final boolean isWriteTransaction;
+    protected final long[] longHashes;
+    protected final IHyracksTaskContext ctx;
+    protected final int resourcePartition;
+    protected ITransactionContext transactionContext;
+    protected LogRecord logRecord;
+    protected final boolean isSink;
+
+    public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
+            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, boolean isSink) {
+        this.ctx = ctx;
+        IAppRuntimeContext runtimeCtx =
+                (IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
+        this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
+        this.logMgr = runtimeCtx.getTransactionSubsystem().getLogManager();
+        this.jobId = jobId;
+        this.datasetId = datasetId;
+        this.primaryKeyFields = primaryKeyFields;
+        this.tRef = new FrameTupleReference();
+        this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob;
+        this.isWriteTransaction = isWriteTransaction;
+        this.resourcePartition = resourcePartition;
+        this.isSink = isSink;
+        longHashes = new long[2];
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        try {
+            transactionContext = transactionManager.getTransactionContext(jobId, false);
+            transactionContext.setWriteTxn(isWriteTransaction);
+            ILogMarkerCallback callback =
+                    TaskUtil.<ILogMarkerCallback>get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
+            logRecord = new LogRecord(callback);
+            if (isSink) {
+                return;
+            }
+            initAccessAppend(ctx);
+            writer.open();
+        } catch (ACIDException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        tAccess.reset(buffer);
+        int nTuple = tAccess.getTupleCount();
+        for (int t = 0; t < nTuple; t++) {
+            if (isTemporaryDatasetWriteJob) {
+                /**
+                 * This "if branch" is for writes over temporary datasets. A temporary dataset does not require any lock
+                 * and does not generate any write-ahead update and commit log but generates flush log and job commit
+                 * log. However, a temporary dataset still MUST guarantee no-steal policy so that this notification call
+                 * should be delivered to PrimaryIndexOptracker and used correctly in order to decrement number of
+                 * active operation count of PrimaryIndexOptracker. By maintaining the count correctly and only allowing
+                 * flushing when the count is 0, it can guarantee the no-steal policy for temporary datasets, too.
+                 */
+                transactionContext.notifyOptracker(false);
+            } else {
+                tRef.reset(tAccess, t);
+                try {
+                    formLogRecord(buffer, t);
+                    logMgr.log(logRecord);
+                    if (!isSink) {
+                        appendTupleToFrame(t);
+                    }
+                } catch (ACIDException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        }
+        VSizeFrame message = TaskUtil.<VSizeFrame>get(HyracksConstants.KEY_MESSAGE, ctx);
+        if (message != null
+                && MessagingFrameTupleAppender.getMessageType(message) == MessagingFrameTupleAppender.MARKER_MESSAGE) {
+            try {
+                formMarkerLogRecords(message.getBuffer());
+                logMgr.log(logRecord);
+            } catch (ACIDException e) {
+                throw new HyracksDataException(e);
+            }
+            message.reset();
+            message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+            message.getBuffer().flip();
+        }
+    }
+
+    private void formMarkerLogRecords(ByteBuffer marker) {
+        TransactionUtil.formMarkerLogRecord(logRecord, transactionContext, datasetId, resourcePartition, marker);
+    }
+
+    protected void formLogRecord(ByteBuffer buffer, int t) {
+        int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields);
+        TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, tRef,
+                primaryKeyFields, resourcePartition, LogType.ENTITY_COMMIT);
+    }
+
+    protected int computePrimaryKeyHashValue(ITupleReference tuple, int[] primaryKeyFields) {
+        MurmurHash128Bit.hash3_x64_128(tuple, primaryKeyFields, SEED, longHashes);
+        return Math.abs((int) longHashes[0]);
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        failed = true;
+        if (isSink) {
+            return;
+        }
+        writer.fail();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (isSink) {
+            return;
+        }
+        flushIfNotFailed();
+        writer.close();
+        appender.reset(frame, true);
+    }
+
+    @Override
+    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+        this.inputRecordDesc = recordDescriptor;
+        this.tAccess = new FrameTupleAccessor(inputRecordDesc);
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        // Commit is at the end of a modification pipeline and there is no need to flush
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
new file mode 100644
index 0000000..536e657
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.transaction.management.runtime;
+
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class CommitRuntimeFactory implements IPushRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final JobId jobId;
+    private final int datasetId;
+    private final int[] primaryKeyFields;
+    private final boolean isTemporaryDatasetWriteJob;
+    private final boolean isWriteTransaction;
+    private final int upsertVarIdx;
+    private int[] datasetPartitions;
+    private final boolean isSink;
+
+    public CommitRuntimeFactory(JobId jobId, int datasetId, int[] primaryKeyFields, boolean isTemporaryDatasetWriteJob,
+            boolean isWriteTransaction, int upsertVarIdx, int[] datasetPartitions, boolean isSink) {
+        this.jobId = jobId;
+        this.datasetId = datasetId;
+        this.primaryKeyFields = primaryKeyFields;
+        this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob;
+        this.isWriteTransaction = isWriteTransaction;
+        this.upsertVarIdx = upsertVarIdx;
+        this.datasetPartitions = datasetPartitions;
+        this.isSink = isSink;
+    }
+
+    @Override
+    public String toString() {
+        return "commit";
+    }
+
+    @Override
+    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+        if (upsertVarIdx >= 0) {
+            return new UpsertCommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob,
+                    isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()],
+                    upsertVarIdx, isSink);
+        } else {
+            return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob,
+                    isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
new file mode 100644
index 0000000..9b2fe36
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.runtime;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.common.utils.TransactionUtil;
+import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class UpsertCommitRuntime extends CommitRuntime {
+    private final int upsertIdx;
+
+    public UpsertCommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
+            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, int upsertIdx,
+            boolean isSink) {
+        super(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob, isWriteTransaction,
+                resourcePartition, isSink);
+        this.upsertIdx = upsertIdx;
+    }
+
+    @Override
+    protected void formLogRecord(ByteBuffer buffer, int t) {
+        boolean isNull = ABooleanSerializerDeserializer.getBoolean(buffer.array(), tAccess.getFieldSlotsLength()
+                + tAccess.getTupleStartOffset(t) + tAccess.getFieldStartOffset(t, upsertIdx) + 1);
+        if (isNull) {
+            // Previous record not found (insert)
+            super.formLogRecord(buffer, t);
+        } else {
+            // Previous record found (delete + insert)
+            int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields);
+            TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, tRef,
+                    primaryKeyFields, resourcePartition, LogType.UPSERT_ENTITY_COMMIT);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java
index db0b483..3df27ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java
@@ -55,7 +55,7 @@ public final class BooleanPointable extends AbstractPointable implements IHashab
     };
 
     public static boolean getBoolean(byte[] bytes, int start) {
-        return bytes[start] == 0 ? false : true;
+        return bytes[start] != 0;
     }
 
     public static void setBoolean(byte[] bytes, int start, boolean value) {


Mime
View raw message