asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sjaco...@apache.org
Subject asterixdb git commit: [ASTERIXDB-2167][TX][RT] Remove TxnId from JobSpecification
Date Thu, 30 Nov 2017 00:31:17 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master 0a0a2b3b5 -> 7650420d9


[ASTERIXDB-2167][TX][RT] Remove TxnId from JobSpecification

- user model changes: no
- storage format changes: no
- interface changes: IJobEventListenerFactory

details:
- Remove the TxnId from the compiled job specification
- This enables one job spec to be used by multiple jobs/transactions
- Runtime operators who need the TxnId will pull it from the EventListener

Change-Id: I9526d50b31aebc3bf971d95ba3edf29c0c1066a7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2154
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/7650420d
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/7650420d
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/7650420d

Branch: refs/heads/master
Commit: 7650420d90a60974d05894491e9e58ede26f8996
Parents: 0a0a2b3
Author: Steven Glenn Jacobs <sjaco002@ucr.edu>
Authored: Tue Nov 28 18:05:48 2017 -0800
Committer: Steven Jacobs <sjaco002@ucr.edu>
Committed: Wed Nov 29 16:30:53 2017 -0800

----------------------------------------------------------------------
 .../operators/physical/CommitPOperator.java     |  7 +--
 .../physical/InvertedIndexPOperator.java        |  2 -
 .../rules/SetupCommitExtensionOpRule.java       |  8 +---
 .../apache/asterix/utils/FeedOperations.java    | 11 +++--
 .../org/apache/asterix/utils/RebalanceUtil.java |  8 ++--
 .../app/bootstrap/TestNodeController.java       |  6 +--
 .../common/api/IJobEventListenerFactory.java    |  4 +-
 .../AbstractOperationCallbackFactory.java       |  4 +-
 .../metadata/declared/MetadataProvider.java     | 18 +++-----
 .../asterix/metadata/entities/Dataset.java      | 46 ++++++++------------
 .../asterix/metadata/utils/DatasetUtil.java     | 15 +++----
 .../asterix/metadata/utils/IndexUtil.java       |  5 +--
 .../utils/SecondaryBTreeOperationsHelper.java   |  7 +--
 ...econdaryCorrelatedBTreeOperationsHelper.java |  6 +--
 ...CorrelatedInvertedIndexOperationsHelper.java |  6 +--
 ...econdaryCorrelatedRTreeOperationsHelper.java |  6 +--
 ...daryCorrelatedTreeIndexOperationsHelper.java |  8 ++--
 .../SecondaryInvertedIndexOperationsHelper.java |  6 +--
 .../utils/SecondaryRTreeOperationsHelper.java   |  7 +--
 .../job/listener/JobEventListenerFactory.java   |  6 +--
 ...tiTransactionJobletEventListenerFactory.java | 28 ++++++------
 .../LockThenSearchOperationCallbackFactory.java |  7 ++-
 ...exInstantSearchOperationCallbackFactory.java |  7 ++-
 ...dexModificationOperationCallbackFactory.java |  7 ++-
 ...maryIndexSearchOperationCallbackFactory.java |  7 ++-
 ...dexModificationOperationCallbackFactory.java |  7 ++-
 .../UpsertOperationCallbackFactory.java         |  7 ++-
 .../runtime/CommitRuntimeFactory.java           | 10 ++---
 28 files changed, 106 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
index abd18aa..09092ff 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -21,7 +21,6 @@ package org.apache.asterix.algebra.operators.physical;
 
 import java.util.List;
 
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -43,12 +42,10 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 public class CommitPOperator extends AbstractPhysicalOperator {
 
     private final List<LogicalVariable> primaryKeyLogicalVars;
-    private final TxnId txnId;
     private final Dataset dataset;
     private final boolean isSink;
 
-    public CommitPOperator(TxnId txnId, Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars, boolean isSink) {
-        this.txnId = txnId;
+    public CommitPOperator(Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars, boolean isSink) {
         this.dataset = dataset;
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
         this.isSink = isSink;
@@ -87,7 +84,7 @@ public class CommitPOperator extends AbstractPhysicalOperator {
         int[] primaryKeyFields = JobGenHelper.variablesToFieldIndexes(primaryKeyLogicalVars, inputSchemas[0]);
 
         //get dataset splits
-        IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(metadataProvider, txnId, primaryKeyFields,
+        IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(metadataProvider, primaryKeyFields,
                 isSink);
         builder.contributeMicroOperator(op, runtime, recDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index c941320..c3cc0ae 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -30,7 +30,6 @@ import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.optimizer.rules.am.InvertedIndexAccessMethod;
 import org.apache.asterix.optimizer.rules.am.InvertedIndexAccessMethod.SearchModifierType;
 import org.apache.asterix.optimizer.rules.am.InvertedIndexJobGenParams;
-import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -158,7 +157,6 @@ public class InvertedIndexPOperator extends IndexSearchPOperator {
                 jobSpec, outputRecDesc, queryField, dataflowHelperFactory, queryTokenizerFactory, searchModifierFactory,
                 retainInput, retainMissing, context.getMissingWriterFactory(),
                 dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(), secondaryIndex,
-                        ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getTxnId(),
                         IndexOperation.SEARCH, null),
                 minFilterFieldIndexes, maxFilterFieldIndexes, isFullTextSearchQuery, numPrimaryKeys,
                 propagateIndexFilter);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
index 61339bf..7dfe161 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
@@ -23,9 +23,7 @@ import java.util.List;
 
 import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.algebra.operators.physical.CommitPOperator;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
-import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -99,14 +97,10 @@ public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule {
             primaryKeyLogicalVars.add(new LogicalVariable(varRefExpr.getVariableReference().getId()));
         }
 
-        //get TxnId(TransactorId)
-        MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
-        TxnId txnId = mp.getTxnId();
-
         //create the logical and physical operator
         CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, isSink);
         CommitPOperator commitPOperator =
-                new CommitPOperator(txnId, dataset, primaryKeyLogicalVars, isSink);
+                new CommitPOperator(dataset, primaryKeyLogicalVars, isSink);
         commitOperator.setPhysicalOperator(commitPOperator);
 
         //create ExtensionOperator and put the commitOperator in it.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index e42b5e5..9dddda4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -279,7 +279,7 @@ public class FeedOperations {
         Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorIdMapping = new HashMap<>();
         Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations = new HashMap<>();
         Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>();
-        List<TxnId> txnIds = new ArrayList<>();
+        Map<Integer, TxnId> txnIdMap = new HashMap<>();
         FeedMetaOperatorDescriptor metaOp;
 
         for (int iter1 = 0; iter1 < jobsList.size(); iter1++) {
@@ -415,11 +415,16 @@ public class FeedOperations {
             for (OperatorDescriptorId root : subJob.getRoots()) {
                 jobSpec.addRoot(jobSpec.getOperatorMap().get(operatorIdMapping.get(root)));
             }
-            txnIds.add(((JobEventListenerFactory) subJob.getJobletEventListenerFactory()).getTxnId());
+            int datasetId = metadataProvider
+                    .findDataset(curFeedConnection.getDataverseName(), curFeedConnection.getDatasetName())
+                    .getDatasetId();
+            TxnId txnId = ((JobEventListenerFactory) subJob.getJobletEventListenerFactory()).getTxnId(datasetId);
+            txnIdMap.put(datasetId, txnId);
         }
 
         // jobEventListenerFactory
-        jobSpec.setJobletEventListenerFactory(new MultiTransactionJobletEventListenerFactory(txnIds, true));
+        jobSpec.setJobletEventListenerFactory(
+                new MultiTransactionJobletEventListenerFactory(txnIdMap, true));
         // useConnectorSchedulingPolicy
         jobSpec.setUseConnectorPolicyForScheduling(jobsList.get(0).isUseConnectorPolicyForScheduling());
         // connectorAssignmentPolicy

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 42f577f..a04c994 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -283,13 +283,13 @@ public class RebalanceUtil {
         IOperatorDescriptor starter = DatasetUtil.createDummyKeyProviderOp(spec, source, metadataProvider);
 
         // Creates primary index scan op.
-        IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, source, txnId);
+        IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, source);
 
         // Creates secondary BTree upsert op.
         IOperatorDescriptor upsertOp = createPrimaryIndexUpsertOp(spec, metadataProvider, source, target);
 
         // The final commit operator.
-        IOperatorDescriptor commitOp = createUpsertCommitOp(spec, metadataProvider, txnId, target);
+        IOperatorDescriptor commitOp = createUpsertCommitOp(spec, metadataProvider, target);
 
         // Connects empty-tuple-source and scan.
         spec.connect(new OneToOneConnectorDescriptor(spec), starter, 0, primaryScanOp, 0);
@@ -326,11 +326,11 @@ public class RebalanceUtil {
 
     // Creates the commit operator for populating the target dataset.
     private static IOperatorDescriptor createUpsertCommitOp(JobSpecification spec, MetadataProvider metadataProvider,
-            TxnId txnId, Dataset target) throws AlgebricksException {
+            Dataset target) throws AlgebricksException {
         int[] primaryKeyFields = getPrimaryKeyPermutationForUpsert(target);
         return new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
                 new IPushRuntimeFactory[] {
-                        target.getCommitRuntimeFactory(metadataProvider, txnId, primaryKeyFields, true) },
+                        target.getCommitRuntimeFactory(metadataProvider, primaryKeyFields, true) },
                 new RecordDescriptor[] { target.getPrimaryRecordDescriptor(metadataProvider) });
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 352a5f8..a1c2ee6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -183,7 +183,7 @@ public class TestNodeController {
                     mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators);
             IndexOperation op = IndexOperation.INSERT;
             IModificationOperationCallbackFactory modOpCallbackFactory =
-                    new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(ctx), dataset.getDatasetId(),
+                    new PrimaryIndexModificationOperationCallbackFactory(dataset.getDatasetId(),
                             primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, Operation.get(op),
                             ResourceType.LSM_BTREE);
             IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
@@ -614,9 +614,9 @@ public class TestNodeController {
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, keyTypes, recordType, metaType,
                 mergePolicy.first, mergePolicy.second, filterFields, keyIndexes, keyIndicators);
         IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
-                storageComponentProvider, primaryIndexInfo.index, getTxnJobId(ctx), IndexOperation.UPSERT, keyIndexes);
+                storageComponentProvider, primaryIndexInfo.index, IndexOperation.UPSERT, keyIndexes);
         ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
-                storageComponentProvider, primaryIndexInfo.index, getTxnJobId(ctx), IndexOperation.UPSERT, keyIndexes);
+                storageComponentProvider, primaryIndexInfo.index, IndexOperation.UPSERT, keyIndexes);
         IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
         IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
                 storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
index 0f37b13..acb3ae8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
@@ -22,9 +22,9 @@ import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 
 /**
- * an interface for JobEventListenerFactories to add Asterix transaction JobId getter
+ * an interface for JobEventListenerFactories to add Asterix txnId getter
  */
 public interface IJobEventListenerFactory extends IJobletEventListenerFactory {
 
-    TxnId getTxnId(TxnId compiledTxnId);
+    TxnId getTxnId(int datasetId);
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
index d2b1276..ce6671b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
@@ -26,15 +26,13 @@ import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 public abstract class AbstractOperationCallbackFactory implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    protected final TxnId txnId;
     protected final int datasetId;
     protected final int[] primaryKeyFields;
     protected final ITransactionSubsystemProvider txnSubsystemProvider;
     protected final byte resourceType;
 
-    public AbstractOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields,
+    public AbstractOperationCallbackFactory(int datasetId, int[] primaryKeyFields,
             ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) {
-        this.txnId = txnId;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
         this.txnSubsystemProvider = txnSubsystemProvider;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 305cdfa..1e0d597 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -84,7 +84,6 @@ import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.utils.NonTaggedFormatUtil;
 import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
 import org.apache.asterix.runtime.formats.FormatUtils;
-import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor;
 import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
 import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
@@ -446,7 +445,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         }
 
         ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
-                storageComponentProvider, theIndex, txnId, IndexOperation.SEARCH, primaryKeyFields);
+                storageComponentProvider, theIndex, IndexOperation.SEARCH, primaryKeyFields);
         IStorageManager storageManager = getStorageComponentProvider().getStorageManager();
         IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(storageManager, spPc.first);
         BTreeSearchOperatorDescriptor btreeSearchOp;
@@ -485,7 +484,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         }
 
         ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
-                storageComponentProvider, secondaryIndex, txnId, IndexOperation.SEARCH, primaryKeyFields);
+                storageComponentProvider, secondaryIndex, IndexOperation.SEARCH, primaryKeyFields);
         RTreeSearchOperatorDescriptor rtreeSearchOp;
         IIndexDataflowHelperFactory indexDataflowHelperFactory =
                 new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), spPc.first);
@@ -789,7 +788,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             // files index
             RecordDescriptor outRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
             ISearchOperationCallbackFactory searchOpCallbackFactory = dataset
-                    .getSearchCallbackFactory(storageComponentProvider, fileIndex, txnId, IndexOperation.SEARCH, null);
+                    .getSearchCallbackFactory(storageComponentProvider, fileIndex, IndexOperation.SEARCH, null);
             // Create the operator
             ExternalLookupOperatorDescriptor op = new ExternalLookupOperatorDescriptor(jobSpec, adapterFactory,
                     outRecDesc, indexDataflowHelperFactory, searchOpCallbackFactory,
@@ -959,7 +958,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             primaryKeyFields[i] = i;
         }
         IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
-                storageComponentProvider, primaryIndex, txnId, indexOp, primaryKeyFields);
+                storageComponentProvider, primaryIndex, indexOp, primaryKeyFields);
         IIndexDataflowHelperFactory idfh =
                 new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
         IOperatorDescriptor op;
@@ -1081,9 +1080,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
                     getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
             // prepare callback
-            TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId();
             IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
-                    storageComponentProvider, secondaryIndex, txnId, indexOp, modificationCallbackPrimaryKeyFields);
+                    storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
             IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(
                     storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
             IOperatorDescriptor op;
@@ -1179,9 +1177,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                 getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
 
         // prepare callback
-        TxnId planTxnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId();
         IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
-                storageComponentProvider, secondaryIndex, planTxnId, indexOp, modificationCallbackPrimaryKeyFields);
+                storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
         IIndexDataflowHelperFactory indexDataflowHelperFactory =
                 new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
         IOperatorDescriptor op;
@@ -1289,9 +1286,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                     getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
 
             // prepare callback
-            TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId();
             IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
-                    storageComponentProvider, secondaryIndex, txnId, indexOp, modificationCallbackPrimaryKeyFields);
+                    storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
             IIndexDataflowHelperFactory indexDataFlowFactory = new IndexDataflowHelperFactory(
                     storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
             IOperatorDescriptor op;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index e6c0de8..2386d77 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -42,7 +42,6 @@ import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallba
 import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
 import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.common.utils.JobUtils;
 import org.apache.asterix.common.utils.JobUtils.ProgressState;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
@@ -541,8 +540,6 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
      *
      * @param index
      *            the index
-     * @param txnId
-     *            the job id being compiled
      * @param op
      *            the operation this search is part of
      * @param primaryKeyFields
@@ -553,21 +550,21 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
      *             if the callback factory could not be created
      */
     public ISearchOperationCallbackFactory getSearchCallbackFactory(IStorageComponentProvider storageComponentProvider,
-            Index index, TxnId txnId, IndexOperation op, int[] primaryKeyFields) throws AlgebricksException {
+            Index index, IndexOperation op, int[] primaryKeyFields) throws AlgebricksException {
         if (index.isPrimaryIndex()) {
             /*
              * Due to the read-committed isolation level,
              * we may acquire very short duration lock(i.e., instant lock) for readers.
              */
-            return (op == IndexOperation.UPSERT) ?
-                    new LockThenSearchOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields,
-                            storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE) :
-                    new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields,
+            return (op == IndexOperation.UPSERT)
+                    ? new LockThenSearchOperationCallbackFactory(getDatasetId(), primaryKeyFields,
+                            storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE)
+                    : new PrimaryIndexInstantSearchOperationCallbackFactory(getDatasetId(), primaryKeyFields,
                             storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE);
         } else if (index.getKeyFieldNames().isEmpty()) {
             // this is the case where the index is secondary primary index and locking is required
             // since the secondary primary index replaces the dataset index (which locks)
-            return new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields,
+            return new PrimaryIndexInstantSearchOperationCallbackFactory(getDatasetId(), primaryKeyFields,
                     storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE);
         }
         return new SecondaryIndexSearchOperationCallbackFactory();
@@ -578,8 +575,6 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
      *
      * @param index
      *            the index
-     * @param txnId
-     *            the job id of the job being compiled
      * @param op
      *            the operation performed for this callback
      * @param primaryKeyFields
@@ -590,24 +585,23 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
      *             If the callback factory could not be created
      */
     public IModificationOperationCallbackFactory getModificationCallbackFactory(
-            IStorageComponentProvider componentProvider, Index index, TxnId txnId, IndexOperation op,
+            IStorageComponentProvider componentProvider, Index index, IndexOperation op,
             int[] primaryKeyFields) throws AlgebricksException {
         if (index.isPrimaryIndex()) {
-            return op == IndexOperation.UPSERT ?
-                    new UpsertOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields,
+            return op == IndexOperation.UPSERT ? new UpsertOperationCallbackFactory(getDatasetId(), primaryKeyFields,
                             componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
-                            index.resourceType()) :
-                    op == IndexOperation.DELETE || op == IndexOperation.INSERT ?
-                            new PrimaryIndexModificationOperationCallbackFactory(txnId, getDatasetId(),
+                            index.resourceType())
+                    : op == IndexOperation.DELETE || op == IndexOperation.INSERT
+                            ? new PrimaryIndexModificationOperationCallbackFactory(getDatasetId(),
                                     primaryKeyFields, componentProvider.getTransactionSubsystemProvider(),
-                                    Operation.get(op), index.resourceType()) :
-                            NoOpOperationCallbackFactory.INSTANCE;
+                                    Operation.get(op), index.resourceType())
+                            : NoOpOperationCallbackFactory.INSTANCE;
         } else {
-            return op == IndexOperation.DELETE || op == IndexOperation.INSERT || op == IndexOperation.UPSERT ?
-                    new SecondaryIndexModificationOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields,
+            return op == IndexOperation.DELETE || op == IndexOperation.INSERT || op == IndexOperation.UPSERT
+                    ? new SecondaryIndexModificationOperationCallbackFactory(getDatasetId(), primaryKeyFields,
                             componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
-                            index.resourceType()) :
-                    NoOpOperationCallbackFactory.INSTANCE;
+                            index.resourceType())
+                    : NoOpOperationCallbackFactory.INSTANCE;
         }
     }
 
@@ -651,8 +645,6 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
      *
      * @param metadataProvider,
      *            the metadata provider.
-     * @param txnId,
-     *            the AsterixDB job id for transaction management.
      * @param primaryKeyFieldPermutation,
      *            the primary key field permutation according to the input.
      * @param isSink,
@@ -660,10 +652,10 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
      * @return the commit runtime factory for inserting/upserting/deleting operations on this dataset.
      * @throws AlgebricksException
      */
-    public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider metadataProvider, TxnId txnId,
+    public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider metadataProvider,
             int[] primaryKeyFieldPermutation, boolean isSink) throws AlgebricksException {
         int[] datasetPartitions = getDatasetPartitions(metadataProvider);
-        return new CommitRuntimeFactory(txnId, datasetId, primaryKeyFieldPermutation,
+        return new CommitRuntimeFactory(datasetId, primaryKeyFieldPermutation,
                 metadataProvider.isWriteTransaction(), datasetPartitions, isSink);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 5973c06..3d05c0e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -38,7 +38,6 @@ import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.transactions.IRecoveryManager;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.formats.base.IDataFormat;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -57,7 +56,6 @@ import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorDescriptor;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
@@ -335,13 +333,11 @@ public class DatasetUtil {
      *            the metadata provider.
      * @param dataset,
      *            the dataset to scan.
-     * @param txnId,
-     *            the AsterixDB job id for transaction management.
      * @return a primary index scan operator.
      * @throws AlgebricksException
      */
     public static IOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec, MetadataProvider metadataProvider,
-            Dataset dataset, TxnId txnId) throws AlgebricksException {
+            Dataset dataset) throws AlgebricksException {
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
                 metadataProvider.getSplitProviderAndConstraints(dataset);
         IFileSplitProvider primaryFileSplitProvider = primarySplitsAndConstraint.first;
@@ -351,8 +347,8 @@ public class DatasetUtil {
         // +Infinity
         int[] highKeyFields = null;
         ITransactionSubsystemProvider txnSubsystemProvider = TransactionSubsystemProvider.INSTANCE;
-        ISearchOperationCallbackFactory searchCallbackFactory =
-                new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, dataset.getDatasetId(),
+        ISearchOperationCallbackFactory searchCallbackFactory = new PrimaryIndexInstantSearchOperationCallbackFactory(
+                dataset.getDatasetId(),
                         dataset.getPrimaryBloomFilterFields(), txnSubsystemProvider,
                         IRecoveryManager.ResourceType.LSM_BTREE);
         IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
@@ -396,7 +392,6 @@ public class DatasetUtil {
                 metadataProvider.getSplitProviderAndConstraints(dataset);
 
         // prepare callback
-        TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId(null);
         int[] primaryKeyFields = new int[numKeys];
         for (int i = 0; i < numKeys; i++) {
             primaryKeyFields[i] = i;
@@ -405,9 +400,9 @@ public class DatasetUtil {
                 metadataProvider.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName()).size() > 1;
         IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
         IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
-                storageComponentProvider, primaryIndex, txnId, IndexOperation.UPSERT, primaryKeyFields);
+                storageComponentProvider, primaryIndex, IndexOperation.UPSERT, primaryKeyFields);
         ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
-                storageComponentProvider, primaryIndex, txnId, IndexOperation.UPSERT, primaryKeyFields);
+                storageComponentProvider, primaryIndex, IndexOperation.UPSERT, primaryKeyFields);
         IIndexDataflowHelperFactory idfh =
                 new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
         LSMPrimaryUpsertOperatorDescriptor op;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index e6a24e3..2ebfe78 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.metadata.utils;
 
-import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.*;
+import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
 
 import java.util.EnumSet;
 import java.util.List;
@@ -161,13 +161,12 @@ public class IndexUtil {
      *            the metadata provider.
      * @return the AsterixDB job id for transaction management.
      */
-    public static TxnId bindJobEventListener(JobSpecification spec, MetadataProvider metadataProvider) {
+    public static void bindJobEventListener(JobSpecification spec, MetadataProvider metadataProvider) {
         TxnId txnId = TxnIdFactory.create();
         metadataProvider.setTxnId(txnId);
         boolean isWriteTransaction = metadataProvider.isWriteTransaction();
         IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(txnId, isWriteTransaction);
         spec.setJobletEventListenerFactory(jobEventListenerFactory);
-        return txnId;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
index 8f70f21..41def96 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
@@ -22,7 +22,6 @@ import java.util.List;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.GlobalConfig;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -129,11 +128,10 @@ public class SecondaryBTreeOperationsHelper extends SecondaryTreeIndexOperations
             // Create dummy key provider for feeding the primary index scan.
             IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset,
                     metadataProvider);
-            TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+            IndexUtil.bindJobEventListener(spec, metadataProvider);
 
             // Create primary index scan op.
-            IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset,
-                    txnId);
+            IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset);
 
             // Assign op.
             IOperatorDescriptor sourceOp = primaryScanOp;
@@ -199,7 +197,6 @@ public class SecondaryBTreeOperationsHelper extends SecondaryTreeIndexOperations
      *      ====== ========= ........ ........
      */
     @Override
-    @SuppressWarnings("rawtypes")
     protected void setSecondaryRecDescAndComparators() throws AlgebricksException {
         int numSecondaryKeys = index.getKeyFieldNames().size();
         secondaryFieldAccessEvalFactories = new IScalarEvaluatorFactory[numSecondaryKeys + numFilterFields];

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
index 89bd4b1..7791cad 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
@@ -21,7 +21,6 @@ package org.apache.asterix.metadata.utils;
 import java.util.List;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
@@ -72,14 +71,14 @@ public class SecondaryCorrelatedBTreeOperationsHelper extends SecondaryCorrelate
 
         // only handle internal datasets
         // Create dummy key provider for feeding the primary index scan.
-        TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+        IndexUtil.bindJobEventListener(spec, metadataProvider);
 
         // Create dummy key provider for feeding the primary index scan.
         IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
 
         // Create primary index scan op.
         IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
-                getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), txnId);
+                getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)));
 
         // Assign op.
         IOperatorDescriptor sourceOp = primaryScanOp;
@@ -124,7 +123,6 @@ public class SecondaryCorrelatedBTreeOperationsHelper extends SecondaryCorrelate
     }
 
     @Override
-    @SuppressWarnings("rawtypes")
     protected void setSecondaryRecDescAndComparators() throws AlgebricksException {
         int numSecondaryKeys = index.getKeyFieldNames().size();
         secondaryFieldAccessEvalFactories = new IScalarEvaluatorFactory[numSecondaryKeys + numFilterFields];

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
index 93cc11d..b91d65f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
@@ -21,7 +21,6 @@ package org.apache.asterix.metadata.utils;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -79,7 +78,6 @@ public class SecondaryCorrelatedInvertedIndexOperationsHelper extends SecondaryC
     }
 
     @Override
-    @SuppressWarnings("rawtypes")
     protected void setSecondaryRecDescAndComparators() throws AlgebricksException {
         int numSecondaryKeys = index.getKeyFieldNames().size();
         IndexType indexType = index.getIndexType();
@@ -206,14 +204,14 @@ public class SecondaryCorrelatedInvertedIndexOperationsHelper extends SecondaryC
     @Override
     public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
         JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+        IndexUtil.bindJobEventListener(spec, metadataProvider);
 
         // Create dummy key provider for feeding the primary index scan.
         IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
 
         // Create primary index scan op.
         IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
-                getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), txnId);
+                getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)));
 
         IOperatorDescriptor sourceOp = primaryScanOp;
         boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
index 1333493..bf5178c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
@@ -23,7 +23,6 @@ import java.util.List;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
@@ -52,7 +51,6 @@ import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 
-@SuppressWarnings("rawtypes")
 public class SecondaryCorrelatedRTreeOperationsHelper extends SecondaryCorrelatedTreeIndexOperationsHelper {
 
     protected IPrimitiveValueProviderFactory[] valueProviderFactories;
@@ -184,11 +182,11 @@ public class SecondaryCorrelatedRTreeOperationsHelper extends SecondaryCorrelate
 
         // Create dummy key provider for feeding the primary index scan.
         IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
-        TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+        IndexUtil.bindJobEventListener(spec, metadataProvider);
 
         // Create primary index scan op.
         IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
-                getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), txnId);
+                getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)));
 
         // Assign op.
         IOperatorDescriptor sourceOp = primaryScanOp;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
index 2a4a952..0a772fa 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
@@ -24,7 +24,6 @@ import org.apache.asterix.common.context.TransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.transactions.IRecoveryManager;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -102,7 +101,6 @@ public abstract class SecondaryCorrelatedTreeIndexOperationsHelper extends Secon
     }
 
     protected RecordDescriptor getTaggedRecordDescriptor(RecordDescriptor recDescriptor) {
-        @SuppressWarnings("rawtypes")
         ISerializerDeserializer[] fields =
                 new ISerializerDeserializer[recDescriptor.getFields().length + NUM_TAG_FIELDS];
         ITypeTraits[] traits = null;
@@ -273,10 +271,10 @@ public abstract class SecondaryCorrelatedTreeIndexOperationsHelper extends Secon
     }
 
     protected IOperatorDescriptor createPrimaryIndexScanDiskComponentsOp(JobSpecification spec,
-            MetadataProvider metadataProvider, RecordDescriptor outRecDesc, TxnId txnId) throws AlgebricksException {
+            MetadataProvider metadataProvider, RecordDescriptor outRecDesc) throws AlgebricksException {
         ITransactionSubsystemProvider txnSubsystemProvider = TransactionSubsystemProvider.INSTANCE;
-        ISearchOperationCallbackFactory searchCallbackFactory =
-                new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, dataset.getDatasetId(),
+        ISearchOperationCallbackFactory searchCallbackFactory = new PrimaryIndexInstantSearchOperationCallbackFactory(
+                dataset.getDatasetId(),
                         dataset.getPrimaryBloomFilterFields(), txnSubsystemProvider,
                         IRecoveryManager.ResourceType.LSM_BTREE);
         IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
index 3626f16..1c9eb74 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
@@ -22,7 +22,6 @@ import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -82,7 +81,6 @@ public class SecondaryInvertedIndexOperationsHelper extends SecondaryTreeIndexOp
     }
 
     @Override
-    @SuppressWarnings("rawtypes")
     protected void setSecondaryRecDescAndComparators() throws AlgebricksException {
         int numSecondaryKeys = index.getKeyFieldNames().size();
         IndexType indexType = index.getIndexType();
@@ -208,14 +206,14 @@ public class SecondaryInvertedIndexOperationsHelper extends SecondaryTreeIndexOp
     @Override
     public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
         JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+        IndexUtil.bindJobEventListener(spec, metadataProvider);
 
         // Create dummy key provider for feeding the primary index scan.
         IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
 
         // Create primary index scan op.
         IOperatorDescriptor primaryScanOp =
-                DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset, txnId);
+                DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset);
 
         IOperatorDescriptor sourceOp = primaryScanOp;
         boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
index 613df21..8e6e0e9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
@@ -23,7 +23,6 @@ import java.util.List;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
@@ -58,7 +57,6 @@ import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 
-@SuppressWarnings("rawtypes")
 public class SecondaryRTreeOperationsHelper extends SecondaryTreeIndexOperationsHelper {
 
     protected IPrimitiveValueProviderFactory[] valueProviderFactories;
@@ -201,11 +199,10 @@ public class SecondaryRTreeOperationsHelper extends SecondaryTreeIndexOperations
         if (dataset.getDatasetType() == DatasetType.INTERNAL) {
             // Create dummy key provider for feeding the primary index scan.
             IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
-            TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+            IndexUtil.bindJobEventListener(spec, metadataProvider);
 
             // Create primary index scan op.
-            IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset,
-                    txnId);
+            IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset);
 
             // Assign op.
             IOperatorDescriptor sourceOp = primaryScanOp;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
index d3c3fe7..0de61ff 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -48,12 +48,8 @@ public class JobEventListenerFactory implements IJobEventListenerFactory {
         this.transactionalWrite = transactionalWrite;
     }
 
-    public TxnId getTxnId() {
-        return txnId;
-    }
-
     @Override
-    public TxnId getTxnId(TxnId compiledTxnId) {
+    public TxnId getTxnId(int datasetId) {
         return txnId;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
index bfe1925..656ea09 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.runtime.job.listener;
 
-import java.util.List;
+import java.util.Map;
 
 import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -40,23 +40,22 @@ import org.apache.hyracks.api.job.JobStatus;
 public class MultiTransactionJobletEventListenerFactory implements IJobEventListenerFactory {
 
     private static final long serialVersionUID = 1L;
-    private final List<TxnId> txnIds;
+    private final Map<Integer, TxnId> txnIdMap;
     private final boolean transactionalWrite;
 
-    public MultiTransactionJobletEventListenerFactory(List<TxnId> txnIds, boolean transactionalWrite) {
-        this.txnIds = txnIds;
+    public MultiTransactionJobletEventListenerFactory(Map<Integer, TxnId> txnIdMap, boolean transactionalWrite) {
+        this.txnIdMap = txnIdMap;
         this.transactionalWrite = transactionalWrite;
     }
 
-    //TODO: Enable this factory to be usable for Deployed Jobs
     @Override
-    public TxnId getTxnId(TxnId compiledTxnId) {
-        return compiledTxnId;
+    public TxnId getTxnId(int datasetId) {
+        return txnIdMap.get(datasetId);
     }
 
     @Override
     public IJobletEventListenerFactory copyFactory() {
-        return new MultiTransactionJobletEventListenerFactory(txnIds, transactionalWrite);
+        return new MultiTransactionJobletEventListenerFactory(txnIdMap, transactionalWrite);
     }
 
     @Override
@@ -74,13 +73,13 @@ public class MultiTransactionJobletEventListenerFactory implements IJobEventList
                     ITransactionManager txnManager =
                             ((INcApplicationContext) jobletContext.getServiceContext().getApplicationContext())
                                     .getTransactionSubsystem().getTransactionManager();
-                    for (TxnId txnId : txnIds) {
-                        ITransactionContext txnContext = txnManager.getTransactionContext(txnId);
+                    for (TxnId subTxnId : txnIdMap.values()) {
+                        ITransactionContext txnContext = txnManager.getTransactionContext(subTxnId);
                         txnContext.setWriteTxn(transactionalWrite);
                         if (jobStatus != JobStatus.FAILURE) {
-                            txnManager.commitTransaction(txnId);
+                            txnManager.commitTransaction(subTxnId);
                         } else {
-                            txnManager.abortTransaction(txnId);
+                            txnManager.abortTransaction(subTxnId);
                         }
                     }
                 } catch (ACIDException e) {
@@ -93,9 +92,10 @@ public class MultiTransactionJobletEventListenerFactory implements IJobEventList
                 try {
                     TransactionOptions options =
                             new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL);
-                    for (TxnId txnId : txnIds) {
+                    for (TxnId subTxnId : txnIdMap.values()) {
                         ((INcApplicationContext) jobletContext.getServiceContext().getApplicationContext())
-                                .getTransactionSubsystem().getTransactionManager().beginTransaction(txnId, options);
+                                .getTransactionSubsystem().getTransactionManager()
+                                .beginTransaction(subTxnId, options);
                     }
                 } catch (ACIDException e) {
                     throw new Error(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
index 9f96263..1346b76 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
@@ -25,7 +25,6 @@ import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -37,9 +36,9 @@ public class LockThenSearchOperationCallbackFactory extends AbstractOperationCal
 
     private static final long serialVersionUID = 1L;
 
-    public LockThenSearchOperationCallbackFactory(TxnId txnId, int datasetId, int[] entityIdFields,
+    public LockThenSearchOperationCallbackFactory(int datasetId, int[] entityIdFields,
             ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) {
-        super(txnId, datasetId, entityIdFields, txnSubsystemProvider, resourceType);
+        super(datasetId, entityIdFields, txnSubsystemProvider, resourceType);
     }
 
     @Override
@@ -49,7 +48,7 @@ public class LockThenSearchOperationCallbackFactory extends AbstractOperationCal
         try {
             IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId));
             return new LockThenSearchOperationCallback(new DatasetId(datasetId), resourceId, primaryKeyFields,
                     txnSubsystem, txnCtx, operatorNodePushable);
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
index f9c8e3c..d4242bf 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
@@ -26,7 +26,6 @@ import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -39,9 +38,9 @@ public class PrimaryIndexInstantSearchOperationCallbackFactory extends AbstractO
 
     private static final long serialVersionUID = 1L;
 
-    public PrimaryIndexInstantSearchOperationCallbackFactory(TxnId txnId, int datasetId, int[] entityIdFields,
+    public PrimaryIndexInstantSearchOperationCallbackFactory(int datasetId, int[] entityIdFields,
             ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) {
-        super(txnId, datasetId, entityIdFields, txnSubsystemProvider, resourceType);
+        super(datasetId, entityIdFields, txnSubsystemProvider, resourceType);
     }
 
     @Override
@@ -51,7 +50,7 @@ public class PrimaryIndexInstantSearchOperationCallbackFactory extends AbstractO
         try {
             IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId));
             return new PrimaryIndexInstantSearchOperationCallback(new DatasetId(datasetId), resourceId,
                     primaryKeyFields, txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 8f5e386..97fd7ce 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -27,7 +27,6 @@ import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -49,9 +48,9 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
     private static final long serialVersionUID = 1L;
     private final Operation indexOp;
 
-    public PrimaryIndexModificationOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields,
+    public PrimaryIndexModificationOperationCallbackFactory(int datasetId, int[] primaryKeyFields,
             ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp, byte resourceType) {
-        super(txnId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
+        super(datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
     }
 
@@ -69,7 +68,7 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
         try {
             IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId));
             DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource();
             IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
index 64cbbc9..72e48bf 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
@@ -26,7 +26,6 @@ import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -39,9 +38,9 @@ public class PrimaryIndexSearchOperationCallbackFactory extends AbstractOperatio
 
     private static final long serialVersionUID = 1L;
 
-    public PrimaryIndexSearchOperationCallbackFactory(TxnId txnId, int datasetId, int[] entityIdFields,
+    public PrimaryIndexSearchOperationCallbackFactory(int datasetId, int[] entityIdFields,
             ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) {
-        super(txnId, datasetId, entityIdFields, txnSubsystemProvider, resourceType);
+        super(datasetId, entityIdFields, txnSubsystemProvider, resourceType);
     }
 
     @Override
@@ -51,7 +50,7 @@ public class PrimaryIndexSearchOperationCallbackFactory extends AbstractOperatio
         try {
             IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId));
             return new PrimaryIndexSearchOperationCallback(new DatasetId(datasetId), resourceId, primaryKeyFields,
                     txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 3fc42c9..0c20ee9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -27,7 +27,6 @@ import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -45,9 +44,9 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract
     private static final long serialVersionUID = 1L;
     private final Operation indexOp;
 
-    public SecondaryIndexModificationOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields,
+    public SecondaryIndexModificationOperationCallbackFactory(int datasetId, int[] primaryKeyFields,
             ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp, byte resourceType) {
-        super(txnId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
+        super(datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
     }
 
@@ -65,7 +64,7 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract
         try {
             IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId));
             DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource();
             IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index da4aab8..c2f512f 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -26,7 +26,6 @@ import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -44,9 +43,9 @@ public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFac
     private static final long serialVersionUID = 1L;
     protected final Operation indexOp;
 
-    public UpsertOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields,
+    public UpsertOperationCallbackFactory(int datasetId, int[] primaryKeyFields,
             ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp, byte resourceType) {
-        super(txnId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
+        super(datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
     }
 
@@ -65,7 +64,7 @@ public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFac
         try {
             IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId));
+                    .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId));
             IModificationOperationCallback modCallback = new UpsertOperationCallback(new DatasetId(datasetId),
                     primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(),
                     aResource.getPartition(), resourceType, indexOp);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7650420d/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 91db197..445ad4a 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -20,7 +20,6 @@
 package org.apache.asterix.transaction.management.runtime;
 
 import org.apache.asterix.common.api.IJobEventListenerFactory;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -31,16 +30,14 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory {
 
     private static final long serialVersionUID = 1L;
 
-    protected final TxnId txnId;
     protected final int datasetId;
     protected final int[] primaryKeyFields;
     protected final boolean isWriteTransaction;
     protected int[] datasetPartitions;
     protected final boolean isSink;
 
-    public CommitRuntimeFactory(TxnId txnId, int datasetId, int[] primaryKeyFields, boolean isWriteTransaction,
+    public CommitRuntimeFactory(int datasetId, int[] primaryKeyFields, boolean isWriteTransaction,
             int[] datasetPartitions, boolean isSink) {
-        this.txnId = txnId;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
         this.isWriteTransaction = isWriteTransaction;
@@ -56,7 +53,8 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory {
     @Override
     public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
         IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
-        return new CommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(txnId), datasetId, primaryKeyFields,
-                isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
+        return new CommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(datasetId), datasetId,
+                primaryKeyFields, isWriteTransaction,
+                datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
     }
 }


Mime
View raw message