asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Steven Jacobs (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: [ASTERIXDB-2167][TX][RT] Remove TxnId from JobSpecification
Date Thu, 30 Nov 2017 00:30:54 GMT
Steven Jacobs has submitted this change and it was merged.

Change subject: [ASTERIXDB-2167][TX][RT] Remove TxnId from JobSpecification
......................................................................


[ASTERIXDB-2167][TX][RT] Remove TxnId from JobSpecification

- user model changes: no
- storage format changes: no
- interface changes: IJobEventListenerFactory

details:
- Remove the TxnId from the compiled job specification
- This enables one job spec to be used by multiple jobs/transactions
- Runtime operators who need the TxnId will pull it from the EventListener

Change-Id: I9526d50b31aebc3bf971d95ba3edf29c0c1066a7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2154
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
---
M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
28 files changed, 106 insertions(+), 160 deletions(-)

Approvals:
  abdullah alamoudi: Looks good to me, approved
  Jenkins: Verified; No violations found; ; Verified

Objections:
  Anon. E. Moose #1000171: Violations found



diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
index abd18aa..09092ff 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -21,7 +21,6 @@
 
 import java.util.List;
 
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -43,12 +42,10 @@
 public class CommitPOperator extends AbstractPhysicalOperator {
 
     private final List<LogicalVariable> primaryKeyLogicalVars;
-    private final TxnId txnId;
     private final Dataset dataset;
     private final boolean isSink;
 
-    public CommitPOperator(TxnId txnId, Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars, boolean isSink) {
-        this.txnId = txnId;
+    public CommitPOperator(Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars, boolean isSink) {
         this.dataset = dataset;
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
         this.isSink = isSink;
@@ -87,7 +84,7 @@
         int[] primaryKeyFields = JobGenHelper.variablesToFieldIndexes(primaryKeyLogicalVars, inputSchemas[0]);
 
         //get dataset splits
-        IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(metadataProvider, txnId, primaryKeyFields,
+        IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(metadataProvider, primaryKeyFields,
                 isSink);
         builder.contributeMicroOperator(op, runtime, recDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index c941320..c3cc0ae 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -30,7 +30,6 @@
 import org.apache.asterix.optimizer.rules.am.InvertedIndexAccessMethod;
 import org.apache.asterix.optimizer.rules.am.InvertedIndexAccessMethod.SearchModifierType;
 import org.apache.asterix.optimizer.rules.am.InvertedIndexJobGenParams;
-import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -158,7 +157,6 @@
                 jobSpec, outputRecDesc, queryField, dataflowHelperFactory, queryTokenizerFactory, searchModifierFactory,
                 retainInput, retainMissing, context.getMissingWriterFactory(),
                 dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(), secondaryIndex,
-                        ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getTxnId(),
                         IndexOperation.SEARCH, null),
                 minFilterFieldIndexes, maxFilterFieldIndexes, isFullTextSearchQuery, numPrimaryKeys,
                 propagateIndexFilter);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
index 61339bf..7dfe161 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
@@ -23,9 +23,7 @@
 
 import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.algebra.operators.physical.CommitPOperator;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
-import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -99,14 +97,10 @@
             primaryKeyLogicalVars.add(new LogicalVariable(varRefExpr.getVariableReference().getId()));
         }
 
-        //get TxnId(TransactorId)
-        MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
-        TxnId txnId = mp.getTxnId();
-
         //create the logical and physical operator
         CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, isSink);
         CommitPOperator commitPOperator =
-                new CommitPOperator(txnId, dataset, primaryKeyLogicalVars, isSink);
+                new CommitPOperator(dataset, primaryKeyLogicalVars, isSink);
         commitOperator.setPhysicalOperator(commitPOperator);
 
         //create ExtensionOperator and put the commitOperator in it.
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index e42b5e5..9dddda4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -279,7 +279,7 @@
         Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorIdMapping = new HashMap<>();
         Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations = new HashMap<>();
         Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>();
-        List<TxnId> txnIds = new ArrayList<>();
+        Map<Integer, TxnId> txnIdMap = new HashMap<>();
         FeedMetaOperatorDescriptor metaOp;
 
         for (int iter1 = 0; iter1 < jobsList.size(); iter1++) {
@@ -415,11 +415,16 @@
             for (OperatorDescriptorId root : subJob.getRoots()) {
                 jobSpec.addRoot(jobSpec.getOperatorMap().get(operatorIdMapping.get(root)));
             }
-            txnIds.add(((JobEventListenerFactory) subJob.getJobletEventListenerFactory()).getTxnId());
+            int datasetId = metadataProvider
+                    .findDataset(curFeedConnection.getDataverseName(), curFeedConnection.getDatasetName())
+                    .getDatasetId();
+            TxnId txnId = ((JobEventListenerFactory) subJob.getJobletEventListenerFactory()).getTxnId(datasetId);
+            txnIdMap.put(datasetId, txnId);
         }
 
         // jobEventListenerFactory
-        jobSpec.setJobletEventListenerFactory(new MultiTransactionJobletEventListenerFactory(txnIds, true));
+        jobSpec.setJobletEventListenerFactory(
+                new MultiTransactionJobletEventListenerFactory(txnIdMap, true));
         // useConnectorSchedulingPolicy
         jobSpec.setUseConnectorPolicyForScheduling(jobsList.get(0).isUseConnectorPolicyForScheduling());
         // connectorAssignmentPolicy
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 42f577f..a04c994 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -283,13 +283,13 @@
         IOperatorDescriptor starter = DatasetUtil.createDummyKeyProviderOp(spec, source, metadataProvider);
 
         // Creates primary index scan op.
-        IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, source, txnId);
+        IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, source);
 
         // Creates secondary BTree upsert op.
         IOperatorDescriptor upsertOp = createPrimaryIndexUpsertOp(spec, metadataProvider, source, target);
 
         // The final commit operator.
-        IOperatorDescriptor commitOp = createUpsertCommitOp(spec, metadataProvider, txnId, target);
+        IOperatorDescriptor commitOp = createUpsertCommitOp(spec, metadataProvider, target);
 
         // Connects empty-tuple-source and scan.
         spec.connect(new OneToOneConnectorDescriptor(spec), starter, 0, primaryScanOp, 0);
@@ -326,11 +326,11 @@
 
     // Creates the commit operator for populating the target dataset.
     private static IOperatorDescriptor createUpsertCommitOp(JobSpecification spec, MetadataProvider metadataProvider,
-            TxnId txnId, Dataset target) throws AlgebricksException {
+            Dataset target) throws AlgebricksException {
         int[] primaryKeyFields = getPrimaryKeyPermutationForUpsert(target);
         return new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
                 new IPushRuntimeFactory[] {
-                        target.getCommitRuntimeFactory(metadataProvider, txnId, primaryKeyFields, true) },
+                        target.getCommitRuntimeFactory(metadataProvider, primaryKeyFields, true) },
                 new RecordDescriptor[] { target.getPrimaryRecordDescriptor(metadataProvider) });
     }
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 352a5f8..a1c2ee6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -183,7 +183,7 @@
                     mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators);
             IndexOperation op = IndexOperation.INSERT;
             IModificationOperationCallbackFactory modOpCallbackFactory =
-                    new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(ctx), dataset.getDatasetId(),
+                    new PrimaryIndexModificationOperationCallbackFactory(dataset.getDatasetId(),
                             primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, Operation.get(op),
                             ResourceType.LSM_BTREE);
             IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
@@ -614,9 +614,9 @@
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, keyTypes, recordType, metaType,
                 mergePolicy.first, mergePolicy.second, filterFields, keyIndexes, keyIndicators);
         IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
-                storageComponentProvider, primaryIndexInfo.index, getTxnJobId(ctx), IndexOperation.UPSERT, keyIndexes);
+                storageComponentProvider, primaryIndexInfo.index, IndexOperation.UPSERT, keyIndexes);
         ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
-                storageComponentProvider, primaryIndexInfo.index, getTxnJobId(ctx), IndexOperation.UPSERT, keyIndexes);
+                storageComponentProvider, primaryIndexInfo.index, IndexOperation.UPSERT, keyIndexes);
         IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
         IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
                 storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
index 0f37b13..acb3ae8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
@@ -22,9 +22,9 @@
 import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 
 /**
- * an interface for JobEventListenerFactories to add Asterix transaction JobId getter
+ * an interface for JobEventListenerFactories to add Asterix txnId getter
  */
 public interface IJobEventListenerFactory extends IJobletEventListenerFactory {
 
-    TxnId getTxnId(TxnId compiledTxnId);
+    TxnId getTxnId(int datasetId);
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
index d2b1276..ce6671b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
@@ -26,15 +26,13 @@
 public abstract class AbstractOperationCallbackFactory implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    protected final TxnId txnId;
     protected final int datasetId;
     protected final int[] primaryKeyFields;
     protected final ITransactionSubsystemProvider txnSubsystemProvider;
     protected final byte resourceType;
 
-    public AbstractOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields,
+    public AbstractOperationCallbackFactory(int datasetId, int[] primaryKeyFields,
             ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) {
-        this.txnId = txnId;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
         this.txnSubsystemProvider = txnSubsystemProvider;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 305cdfa..1e0d597 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -84,7 +84,6 @@
 import org.apache.asterix.om.utils.NonTaggedFormatUtil;
 import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
 import org.apache.asterix.runtime.formats.FormatUtils;
-import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor;
 import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
 import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
@@ -446,7 +445,7 @@
         }
 
         ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
-                storageComponentProvider, theIndex, txnId, IndexOperation.SEARCH, primaryKeyFields);
+                storageComponentProvider, theIndex, IndexOperation.SEARCH, primaryKeyFields);
         IStorageManager storageManager = getStorageComponentProvider().getStorageManager();
         IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(storageManager, spPc.first);
         BTreeSearchOperatorDescriptor btreeSearchOp;
@@ -485,7 +484,7 @@
         }
 
         ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
-                storageComponentProvider, secondaryIndex, txnId, IndexOperation.SEARCH, primaryKeyFields);
+                storageComponentProvider, secondaryIndex, IndexOperation.SEARCH, primaryKeyFields);
         RTreeSearchOperatorDescriptor rtreeSearchOp;
         IIndexDataflowHelperFactory indexDataflowHelperFactory =
                 new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), spPc.first);
@@ -789,7 +788,7 @@
             // files index
             RecordDescriptor outRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
             ISearchOperationCallbackFactory searchOpCallbackFactory = dataset
-                    .getSearchCallbackFactory(storageComponentProvider, fileIndex, txnId, IndexOperation.SEARCH, null);
+                    .getSearchCallbackFactory(storageComponentProvider, fileIndex, IndexOperation.SEARCH, null);
             // Create the operator
             ExternalLookupOperatorDescriptor op = new ExternalLookupOperatorDescriptor(jobSpec, adapterFactory,
                     outRecDesc, indexDataflowHelperFactory, searchOpCallbackFactory,
@@ -959,7 +958,7 @@
             primaryKeyFields[i] = i;
         }
         IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
-                storageComponentProvider, primaryIndex, txnId, indexOp, primaryKeyFields);
+                storageComponentProvider, primaryIndex, indexOp, primaryKeyFields);
         IIndexDataflowHelperFactory idfh =
                 new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
         IOperatorDescriptor op;
@@ -1081,9 +1080,8 @@
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
                     getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
             // prepare callback
-            TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId();
             IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
-                    storageComponentProvider, secondaryIndex, txnId, indexOp, modificationCallbackPrimaryKeyFields);
+                    storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
             IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(
                     storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
             IOperatorDescriptor op;
@@ -1179,9 +1177,8 @@
                 getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
 
         // prepare callback
-        TxnId planTxnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId();
         IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
-                storageComponentProvider, secondaryIndex, planTxnId, indexOp, modificationCallbackPrimaryKeyFields);
+                storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
         IIndexDataflowHelperFactory indexDataflowHelperFactory =
                 new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
         IOperatorDescriptor op;
@@ -1289,9 +1286,8 @@
                     getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
 
             // prepare callback
-            TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId();
             IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
-                    storageComponentProvider, secondaryIndex, txnId, indexOp, modificationCallbackPrimaryKeyFields);
+                    storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
             IIndexDataflowHelperFactory indexDataFlowFactory = new IndexDataflowHelperFactory(
                     storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
             IOperatorDescriptor op;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index e6c0de8..2386d77 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -42,7 +42,6 @@
 import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
 import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.common.utils.JobUtils;
 import org.apache.asterix.common.utils.JobUtils.ProgressState;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
@@ -541,8 +540,6 @@
      *
      * @param index
      *            the index
-     * @param txnId
-     *            the job id being compiled
      * @param op
      *            the operation this search is part of
      * @param primaryKeyFields
@@ -553,21 +550,21 @@
      *             if the callback factory could not be created
      */
     public ISearchOperationCallbackFactory getSearchCallbackFactory(IStorageComponentProvider storageComponentProvider,
-            Index index, TxnId txnId, IndexOperation op, int[] primaryKeyFields) throws AlgebricksException {
+            Index index, IndexOperation op, int[] primaryKeyFields) throws AlgebricksException {
         if (index.isPrimaryIndex()) {
             /*
              * Due to the read-committed isolation level,
              * we may acquire very short duration lock(i.e., instant lock) for readers.
              */
-            return (op == IndexOperation.UPSERT) ?
-                    new LockThenSearchOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields,
-                            storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE) :
-                    new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields,
+            return (op == IndexOperation.UPSERT)
+                    ? new LockThenSearchOperationCallbackFactory(getDatasetId(), primaryKeyFields,
+                            storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE)
+                    : new PrimaryIndexInstantSearchOperationCallbackFactory(getDatasetId(), primaryKeyFields,
                             storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE);
         } else if (index.getKeyFieldNames().isEmpty()) {
             // this is the case where the index is secondary primary index and locking is required
             // since the secondary primary index replaces the dataset index (which locks)
-            return new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields,
+            return new PrimaryIndexInstantSearchOperationCallbackFactory(getDatasetId(), primaryKeyFields,
                     storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE);
         }
         return new SecondaryIndexSearchOperationCallbackFactory();
@@ -578,8 +575,6 @@
      *
      * @param index
      *            the index
-     * @param txnId
-     *            the job id of the job being compiled
      * @param op
      *            the operation performed for this callback
      * @param primaryKeyFields
@@ -590,24 +585,23 @@
      *             If the callback factory could not be created
      */
     public IModificationOperationCallbackFactory getModificationCallbackFactory(
-            IStorageComponentProvider componentProvider, Index index, TxnId txnId, IndexOperation op,
+            IStorageComponentProvider componentProvider, Index index, IndexOperation op,
             int[] primaryKeyFields) throws AlgebricksException {
         if (index.isPrimaryIndex()) {
-            return op == IndexOperation.UPSERT ?
-                    new UpsertOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields,
+            return op == IndexOperation.UPSERT ? new UpsertOperationCallbackFactory(getDatasetId(), primaryKeyFields,
                             componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
-                            index.resourceType()) :
-                    op == IndexOperation.DELETE || op == IndexOperation.INSERT ?
-                            new PrimaryIndexModificationOperationCallbackFactory(txnId, getDatasetId(),
+                            index.resourceType())
+                    : op == IndexOperation.DELETE || op == IndexOperation.INSERT
+                            ? new PrimaryIndexModificationOperationCallbackFactory(getDatasetId(),
                                     primaryKeyFields, componentProvider.getTransactionSubsystemProvider(),
-                                    Operation.get(op), index.resourceType()) :
-                            NoOpOperationCallbackFactory.INSTANCE;
+                                    Operation.get(op), index.resourceType())
+                            : NoOpOperationCallbackFactory.INSTANCE;
         } else {
-            return op == IndexOperation.DELETE || op == IndexOperation.INSERT || op == IndexOperation.UPSERT ?
-                    new SecondaryIndexModificationOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields,
+            return op == IndexOperation.DELETE || op == IndexOperation.INSERT || op == IndexOperation.UPSERT
+                    ? new SecondaryIndexModificationOperationCallbackFactory(getDatasetId(), primaryKeyFields,
                             componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
-                            index.resourceType()) :
-                    NoOpOperationCallbackFactory.INSTANCE;
+                            index.resourceType())
+                    : NoOpOperationCallbackFactory.INSTANCE;
         }
     }
 
@@ -651,8 +645,6 @@
      *
      * @param metadataProvider,
      *            the metadata provider.
-     * @param txnId,
-     *            the AsterixDB job id for transaction management.
      * @param primaryKeyFieldPermutation,
      *            the primary key field permutation according to the input.
      * @param isSink,
@@ -660,10 +652,10 @@
      * @return the commit runtime factory for inserting/upserting/deleting operations on this dataset.
      * @throws AlgebricksException
      */
-    public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider metadataProvider, TxnId txnId,
+    public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider metadataProvider,
             int[] primaryKeyFieldPermutation, boolean isSink) throws AlgebricksException {
         int[] datasetPartitions = getDatasetPartitions(metadataProvider);
-        return new CommitRuntimeFactory(txnId, datasetId, primaryKeyFieldPermutation,
+        return new CommitRuntimeFactory(datasetId, primaryKeyFieldPermutation,
                 metadataProvider.isWriteTransaction(), datasetPartitions, isSink);
     }
 
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 5973c06..3d05c0e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -38,7 +38,6 @@
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.transactions.IRecoveryManager;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.formats.base.IDataFormat;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -57,7 +56,6 @@
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorDescriptor;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
@@ -335,13 +333,11 @@
      *            the metadata provider.
      * @param dataset,
      *            the dataset to scan.
-     * @param txnId,
-     *            the AsterixDB job id for transaction management.
      * @return a primary index scan operator.
      * @throws AlgebricksException
      */
     public static IOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec, MetadataProvider metadataProvider,
-            Dataset dataset, TxnId txnId) throws AlgebricksException {
+            Dataset dataset) throws AlgebricksException {
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
                 metadataProvider.getSplitProviderAndConstraints(dataset);
         IFileSplitProvider primaryFileSplitProvider = primarySplitsAndConstraint.first;
@@ -351,8 +347,8 @@
         // +Infinity
         int[] highKeyFields = null;
         ITransactionSubsystemProvider txnSubsystemProvider = TransactionSubsystemProvider.INSTANCE;
-        ISearchOperationCallbackFactory searchCallbackFactory =
-                new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, dataset.getDatasetId(),
+        ISearchOperationCallbackFactory searchCallbackFactory = new PrimaryIndexInstantSearchOperationCallbackFactory(
+                dataset.getDatasetId(),
                         dataset.getPrimaryBloomFilterFields(), txnSubsystemProvider,
                         IRecoveryManager.ResourceType.LSM_BTREE);
         IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
@@ -396,7 +392,6 @@
                 metadataProvider.getSplitProviderAndConstraints(dataset);
 
         // prepare callback
-        TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId(null);
         int[] primaryKeyFields = new int[numKeys];
         for (int i = 0; i < numKeys; i++) {
             primaryKeyFields[i] = i;
@@ -405,9 +400,9 @@
                 metadataProvider.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName()).size() > 1;
         IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
         IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
-                storageComponentProvider, primaryIndex, txnId, IndexOperation.UPSERT, primaryKeyFields);
+                storageComponentProvider, primaryIndex, IndexOperation.UPSERT, primaryKeyFields);
         ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
-                storageComponentProvider, primaryIndex, txnId, IndexOperation.UPSERT, primaryKeyFields);
+                storageComponentProvider, primaryIndex, IndexOperation.UPSERT, primaryKeyFields);
         IIndexDataflowHelperFactory idfh =
                 new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
         LSMPrimaryUpsertOperatorDescriptor op;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index e6a24e3..2ebfe78 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.metadata.utils;
 
-import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.*;
+import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
 
 import java.util.EnumSet;
 import java.util.List;
@@ -161,13 +161,12 @@
      *            the metadata provider.
      * @return the AsterixDB job id for transaction management.
      */
-    public static TxnId bindJobEventListener(JobSpecification spec, MetadataProvider metadataProvider) {
+    public static void bindJobEventListener(JobSpecification spec, MetadataProvider metadataProvider) {
         TxnId txnId = TxnIdFactory.create();
         metadataProvider.setTxnId(txnId);
         boolean isWriteTransaction = metadataProvider.isWriteTransaction();
         IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(txnId, isWriteTransaction);
         spec.setJobletEventListenerFactory(jobEventListenerFactory);
-        return txnId;
     }
 
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
index 8f70f21..41def96 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
@@ -22,7 +22,6 @@
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.GlobalConfig;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -129,11 +128,10 @@
             // Create dummy key provider for feeding the primary index scan.
             IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset,
                     metadataProvider);
-            TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+            IndexUtil.bindJobEventListener(spec, metadataProvider);
 
             // Create primary index scan op.
-            IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset,
-                    txnId);
+            IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset);
 
             // Assign op.
             IOperatorDescriptor sourceOp = primaryScanOp;
@@ -199,7 +197,6 @@
      *      ====== ========= ........ ........
      */
     @Override
-    @SuppressWarnings("rawtypes")
     protected void setSecondaryRecDescAndComparators() throws AlgebricksException {
         int numSecondaryKeys = index.getKeyFieldNames().size();
         secondaryFieldAccessEvalFactories = new IScalarEvaluatorFactory[numSecondaryKeys + numFilterFields];
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
index 89bd4b1..7791cad 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
@@ -21,7 +21,6 @@
 import java.util.List;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
@@ -72,14 +71,14 @@
 
         // only handle internal datasets
         // Create dummy key provider for feeding the primary index scan.
-        TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+        IndexUtil.bindJobEventListener(spec, metadataProvider);
 
         // Create dummy key provider for feeding the primary index scan.
         IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
 
         // Create primary index scan op.
         IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
-                getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), txnId);
+                getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)));
 
         // Assign op.
         IOperatorDescriptor sourceOp = primaryScanOp;
@@ -124,7 +123,6 @@
     }
 
     @Override
-    @SuppressWarnings("rawtypes")
     protected void setSecondaryRecDescAndComparators() throws AlgebricksException {
         int numSecondaryKeys = index.getKeyFieldNames().size();
         secondaryFieldAccessEvalFactories = new IScalarEvaluatorFactory[numSecondaryKeys + numFilterFields];
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
index 93cc11d..b91d65f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
@@ -21,7 +21,6 @@
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -79,7 +78,6 @@
     }
 
     @Override
-    @SuppressWarnings("rawtypes")
     protected void setSecondaryRecDescAndComparators() throws AlgebricksException {
         int numSecondaryKeys = index.getKeyFieldNames().size();
         IndexType indexType = index.getIndexType();
@@ -206,14 +204,14 @@
     @Override
     public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
         JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+        IndexUtil.bindJobEventListener(spec, metadataProvider);
 
         // Create dummy key provider for feeding the primary index scan.
         IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
 
         // Create primary index scan op.
         IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
-                getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), txnId);
+                getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)));
 
         IOperatorDescriptor sourceOp = primaryScanOp;
         boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
index 1333493..bf5178c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
@@ -23,7 +23,6 @@
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
@@ -52,7 +51,6 @@
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 
-@SuppressWarnings("rawtypes")
 public class SecondaryCorrelatedRTreeOperationsHelper extends SecondaryCorrelatedTreeIndexOperationsHelper {
 
     protected IPrimitiveValueProviderFactory[] valueProviderFactories;
@@ -184,11 +182,11 @@
 
         // Create dummy key provider for feeding the primary index scan.
         IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
-        TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+        IndexUtil.bindJobEventListener(spec, metadataProvider);
 
         // Create primary index scan op.
         IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
-                getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), txnId);
+                getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)));
 
         // Assign op.
         IOperatorDescriptor sourceOp = primaryScanOp;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
index 2a4a952..0a772fa 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
@@ -24,7 +24,6 @@
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.transactions.IRecoveryManager;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -102,7 +101,6 @@
     }
 
     protected RecordDescriptor getTaggedRecordDescriptor(RecordDescriptor recDescriptor) {
-        @SuppressWarnings("rawtypes")
         ISerializerDeserializer[] fields =
                 new ISerializerDeserializer[recDescriptor.getFields().length + NUM_TAG_FIELDS];
         ITypeTraits[] traits = null;
@@ -273,10 +271,10 @@
     }
 
     protected IOperatorDescriptor createPrimaryIndexScanDiskComponentsOp(JobSpecification spec,
-            MetadataProvider metadataProvider, RecordDescriptor outRecDesc, TxnId txnId) throws AlgebricksException {
+            MetadataProvider metadataProvider, RecordDescriptor outRecDesc) throws AlgebricksException {
         ITransactionSubsystemProvider txnSubsystemProvider = TransactionSubsystemProvider.INSTANCE;
-        ISearchOperationCallbackFactory searchCallbackFactory =
-                new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, dataset.getDatasetId(),
+        ISearchOperationCallbackFactory searchCallbackFactory = new PrimaryIndexInstantSearchOperationCallbackFactory(
+                dataset.getDatasetId(),
                         dataset.getPrimaryBloomFilterFields(), txnSubsystemProvider,
                         IRecoveryManager.ResourceType.LSM_BTREE);
         IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
index 3626f16..1c9eb74 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
@@ -22,7 +22,6 @@
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -82,7 +81,6 @@
     }
 
     @Override
-    @SuppressWarnings("rawtypes")
     protected void setSecondaryRecDescAndComparators() throws AlgebricksException {
         int numSecondaryKeys = index.getKeyFieldNames().size();
         IndexType indexType = index.getIndexType();
@@ -208,14 +206,14 @@
     @Override
     public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
         JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+        IndexUtil.bindJobEventListener(spec, metadataProvider);
 
         // Create dummy key provider for feeding the primary index scan.
         IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
 
         // Create primary index scan op.
         IOperatorDescriptor primaryScanOp =
-                DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset, txnId);
+                DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset);
 
         IOperatorDescriptor sourceOp = primaryScanOp;
         boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
index 613df21..8e6e0e9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
@@ -23,7 +23,6 @@
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
@@ -58,7 +57,6 @@
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 
-@SuppressWarnings("rawtypes")
 public class SecondaryRTreeOperationsHelper extends SecondaryTreeIndexOperationsHelper {
 
     protected IPrimitiveValueProviderFactory[] valueProviderFactories;
@@ -201,11 +199,10 @@
         if (dataset.getDatasetType() == DatasetType.INTERNAL) {
             // Create dummy key provider for feeding the primary index scan.
             IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
-            TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+            IndexUtil.bindJobEventListener(spec, metadataProvider);
 
             // Create primary index scan op.
-            IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset,
-                    txnId);
+            IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset);
 
             // Assign op.
             IOperatorDescriptor sourceOp = primaryScanOp;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
index d3c3fe7..0de61ff 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -48,12 +48,8 @@
         this.transactionalWrite = transactionalWrite;
     }
 
-    public TxnId getTxnId() {
-        return txnId;
-    }
-
     @Override
-    public TxnId getTxnId(TxnId compiledTxnId) {
+    public TxnId getTxnId(int datasetId) {
         return txnId;
     }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
index bfe1925..656ea09 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.runtime.job.listener;
 
-import java.util.List;
+import java.util.Map;
 
 import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -40,23 +40,22 @@
 public class MultiTransactionJobletEventListenerFactory implements IJobEventListenerFactory {
 
     private static final long serialVersionUID = 1L;
-    private final List<TxnId> txnIds;
+    private final Map<Integer, TxnId> txnIdMap;
     private final boolean transactionalWrite;
 
-    public MultiTransactionJobletEventListenerFactory(List<TxnId> txnIds, boolean transactionalWrite) {
-        this.txnIds = txnIds;
+    public MultiTransactionJobletEventListenerFactory(Map<Integer, TxnId> txnIdMap, boolean transactionalWrite) {
+        this.txnIdMap = txnIdMap;
         this.transactionalWrite = transactionalWrite;
     }
 
-    //TODO: Enable this factory to be usable for Deployed Jobs
     @Override
-    public TxnId getTxnId(TxnId compiledTxnId) {
-        return compiledTxnId;
+    public TxnId getTxnId(int datasetId) {
+        return txnIdMap.get(datasetId);
     }
 
     @Override
     public IJobletEventListenerFactory copyFactory() {
-        return new MultiTransactionJobletEventListenerFactory(txnIds, transactionalWrite);
+        return new MultiTransactionJobletEventListenerFactory(txnIdMap, transactionalWrite);
     }
 
     @Override
@@ -74,13 +73,13 @@
                     ITransactionManager txnManager =
                             ((INcApplicationContext) jobletContext.getServiceContext().getApplicationContext())
                                     .getTransactionSubsystem().getTransactionManager();
-                    for (TxnId txnId : txnIds) {
-                        ITransactionContext txnContext = txnManager.getTransactionContext(txnId);
+                    for (TxnId subTxnId : txnIdMap.values()) {
+                        ITransactionContext txnContext = txnManager.getTransactionContext(subTxnId);
                         txnContext.setWriteTxn(transactionalWrite);
                         if (jobStatus != JobStatus.FAILURE) {
-                            txnManager.commitTransaction(txnId);
+                            txnManager.commitTransaction(subTxnId);
                         } else {
-                            txnManager.abortTransaction(txnId);
+                            txnManager.abortTransaction(subTxnId);
                         }
                     }
                 } catch (ACIDException e) {
@@ -93,9 +92,10 @@
                 try {
                     TransactionOptions options =
                             new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL);
-                    for (TxnId txnId : txnIds) {
+                    for (TxnId subTxnId : txnIdMap.values()) {
                         ((INcApplicationContext) jobletContext.getServiceContext().getApplicationContext())
-                                .getTransactionSubsystem().getTransactionManager().beginTransaction(txnId, options);
+                                .getTransactionSubsystem().getTransactionManager()
+                                .beginTransaction(subTxnId, options);
                     }
                 } catch (ACIDException e) {
                     throw new Error(e);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
index 9f96263..1346b76 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
@@ -25,7 +25,6 @@
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -37,9 +36,9 @@
 
     private static final long serialVersionUID = 1L;
 
-    public LockThenSearchOperationCallbackFactory(TxnId txnId, int datasetId, int[] entityIdFields,
+    public LockThenSearchOperationCallbackFactory(int datasetId, int[] entityIdFields,
             ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) {
-        super(txnId, datasetId, entityIdFields, txnSubsystemProvider, resourceType);
+        super(datasetId, entityIdFields, txnSubsystemProvider, resourceType);
     }
 
     @Override
@@ -49,7 +48,7 @@
         try {
             IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId));
             return new LockThenSearchOperationCallback(new DatasetId(datasetId), resourceId, primaryKeyFields,
                     txnSubsystem, txnCtx, operatorNodePushable);
         } catch (ACIDException e) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
index f9c8e3c..d4242bf 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
@@ -26,7 +26,6 @@
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -39,9 +38,9 @@
 
     private static final long serialVersionUID = 1L;
 
-    public PrimaryIndexInstantSearchOperationCallbackFactory(TxnId txnId, int datasetId, int[] entityIdFields,
+    public PrimaryIndexInstantSearchOperationCallbackFactory(int datasetId, int[] entityIdFields,
             ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) {
-        super(txnId, datasetId, entityIdFields, txnSubsystemProvider, resourceType);
+        super(datasetId, entityIdFields, txnSubsystemProvider, resourceType);
     }
 
     @Override
@@ -51,7 +50,7 @@
         try {
             IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId));
             return new PrimaryIndexInstantSearchOperationCallback(new DatasetId(datasetId), resourceId,
                     primaryKeyFields, txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 8f5e386..97fd7ce 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -27,7 +27,6 @@
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -49,9 +48,9 @@
     private static final long serialVersionUID = 1L;
     private final Operation indexOp;
 
-    public PrimaryIndexModificationOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields,
+    public PrimaryIndexModificationOperationCallbackFactory(int datasetId, int[] primaryKeyFields,
             ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp, byte resourceType) {
-        super(txnId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
+        super(datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
     }
 
@@ -69,7 +68,7 @@
         try {
             IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId));
             DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource();
             IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
index 64cbbc9..72e48bf 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
@@ -26,7 +26,6 @@
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -39,9 +38,9 @@
 
     private static final long serialVersionUID = 1L;
 
-    public PrimaryIndexSearchOperationCallbackFactory(TxnId txnId, int datasetId, int[] entityIdFields,
+    public PrimaryIndexSearchOperationCallbackFactory(int datasetId, int[] entityIdFields,
             ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) {
-        super(txnId, datasetId, entityIdFields, txnSubsystemProvider, resourceType);
+        super(datasetId, entityIdFields, txnSubsystemProvider, resourceType);
     }
 
     @Override
@@ -51,7 +50,7 @@
         try {
             IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId));
             return new PrimaryIndexSearchOperationCallback(new DatasetId(datasetId), resourceId, primaryKeyFields,
                     txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 3fc42c9..0c20ee9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -27,7 +27,6 @@
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -45,9 +44,9 @@
     private static final long serialVersionUID = 1L;
     private final Operation indexOp;
 
-    public SecondaryIndexModificationOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields,
+    public SecondaryIndexModificationOperationCallbackFactory(int datasetId, int[] primaryKeyFields,
             ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp, byte resourceType) {
-        super(txnId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
+        super(datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
     }
 
@@ -65,7 +64,7 @@
         try {
             IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId));
             DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource();
             IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index da4aab8..c2f512f 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -26,7 +26,6 @@
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -44,9 +43,9 @@
     private static final long serialVersionUID = 1L;
     protected final Operation indexOp;
 
-    public UpsertOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields,
+    public UpsertOperationCallbackFactory(int datasetId, int[] primaryKeyFields,
             ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp, byte resourceType) {
-        super(txnId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
+        super(datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
     }
 
@@ -65,7 +64,7 @@
         try {
             IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId));
             IModificationOperationCallback modCallback = new UpsertOperationCallback(new DatasetId(datasetId),
                     primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(),
                     aResource.getPartition(), resourceType, indexOp);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 91db197..445ad4a 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -20,7 +20,6 @@
 package org.apache.asterix.transaction.management.runtime;
 
 import org.apache.asterix.common.api.IJobEventListenerFactory;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -31,16 +30,14 @@
 
     private static final long serialVersionUID = 1L;
 
-    protected final TxnId txnId;
     protected final int datasetId;
     protected final int[] primaryKeyFields;
     protected final boolean isWriteTransaction;
     protected int[] datasetPartitions;
     protected final boolean isSink;
 
-    public CommitRuntimeFactory(TxnId txnId, int datasetId, int[] primaryKeyFields, boolean isWriteTransaction,
+    public CommitRuntimeFactory(int datasetId, int[] primaryKeyFields, boolean isWriteTransaction,
             int[] datasetPartitions, boolean isSink) {
-        this.txnId = txnId;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
         this.isWriteTransaction = isWriteTransaction;
@@ -56,7 +53,8 @@
     @Override
     public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
         IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
-        return new CommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(txnId), datasetId, primaryKeyFields,
-                isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
+        return new CommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(datasetId), datasetId,
+                primaryKeyFields, isWriteTransaction,
+                datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
     }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2154
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I9526d50b31aebc3bf971d95ba3edf29c0c1066a7
Gerrit-PatchSet: 10
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sjaco002@ucr.edu>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Murtadha Hubail <mhubail@apache.org>
Gerrit-Reviewer: Steven Jacobs <sjaco002@ucr.edu>
Gerrit-Reviewer: Xikui Wang <xkkwww@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <bamousaa@gmail.com>

Mime
View raw message