asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject [6/6] asterixdb git commit: [NO ISSUE][TX] Rename Asterix JobId to TxnId
Date Tue, 14 Nov 2017 17:42:54 GMT
[NO ISSUE][TX] Rename Asterix JobId to TxnId

- user model changes: no
- storage format changes: yes
  - Txn log jobId changed from int to long.
- interface changes: yes
  Update APIs to use long TxnId instead of int JobId

Details:
- Rename TxnId -> TxnEntityId.
- Rename Asterix JobId -> TxnId.
- Rename Asterix JobIdFactory -> TxnIdFactory.
- Change TxnId size from int to long and update
  log sizes accordingly.

Change-Id: I0905595a50195b83c1afae5dde88e5502ad21b9f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2149
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/592af654
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/592af654
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/592af654

Branch: refs/heads/master
Commit: 592af6545d77a66d60a6fb02965d7ad6747e4e3a
Parents: 9eead00
Author: Murtadha Hubail <mhubail@apache.org>
Authored: Tue Nov 14 04:41:31 2017 +0300
Committer: Murtadha Hubail <mhubail@apache.org>
Committed: Tue Nov 14 09:41:53 2017 -0800

----------------------------------------------------------------------
 .../operators/physical/CommitPOperator.java     |  11 +-
 .../physical/InvertedIndexPOperator.java        |   3 +-
 .../rules/SetupCommitExtensionOpRule.java       |   8 +-
 .../apache/asterix/api/common/APIFramework.java |   9 +-
 .../apache/asterix/app/nc/RecoveryManager.java  | 143 +++---
 .../apache/asterix/utils/FeedOperations.java    |   8 +-
 .../apache/asterix/utils/FlushDatasetUtil.java  |   9 +-
 .../org/apache/asterix/utils/RebalanceUtil.java |  16 +-
 .../app/bootstrap/TestNodeController.java       |   5 +-
 .../AbstractOperationCallbackFactory.java       |   6 +-
 .../asterix/common/transactions/Checkpoint.java |  12 +-
 .../asterix/common/transactions/ILogRecord.java |  61 ++-
 .../common/transactions/IRecoveryManager.java   |   6 +-
 .../transactions/ITransactionContext.java       |   2 +-
 .../transactions/ITransactionManager.java       |  14 +-
 .../asterix/common/transactions/JobId.java      |  64 ---
 .../asterix/common/transactions/LogRecord.java  |  30 +-
 .../asterix/common/transactions/TxnId.java      |  61 +++
 .../asterix/common/utils/TransactionUtil.java   |  12 +-
 .../asterix/metadata/MetadataManager.java       | 142 +++---
 .../apache/asterix/metadata/MetadataNode.java   | 452 +++++++++----------
 .../metadata/MetadataTransactionContext.java    |  12 +-
 .../asterix/metadata/api/IMetadataNode.java     | 230 +++++-----
 .../asterix/metadata/api/IValueExtractor.java   |   6 +-
 .../metadata/declared/MetadataProvider.java     |  32 +-
 .../metadata/entities/BuiltinTypeMap.java       |   6 +-
 .../asterix/metadata/entities/Dataset.java      |  32 +-
 .../DatatypeTupleTranslator.java                |  16 +-
 .../IndexTupleTranslator.java                   |  16 +-
 .../MetadataTupleTranslatorProvider.java        |  10 +-
 .../asterix/metadata/utils/DatasetUtil.java     |  14 +-
 .../asterix/metadata/utils/IndexUtil.java       |  15 +-
 .../utils/SecondaryBTreeOperationsHelper.java   |   6 +-
 ...econdaryCorrelatedBTreeOperationsHelper.java |   6 +-
 ...CorrelatedInvertedIndexOperationsHelper.java |   6 +-
 ...econdaryCorrelatedRTreeOperationsHelper.java |   6 +-
 ...daryCorrelatedTreeIndexOperationsHelper.java |   8 +-
 .../SecondaryInvertedIndexOperationsHelper.java |   6 +-
 .../utils/SecondaryRTreeOperationsHelper.java   |   6 +-
 .../MetadataEntityValueExtractor.java           |   5 +-
 .../NestedDatatypeNameValueExtractor.java       |   4 +-
 .../TupleCopyValueExtractor.java                |   4 +-
 .../management/ReplicationChannel.java          |   4 +-
 .../management/ReplicationManager.java          |  60 +--
 .../job/listener/JobEventListenerFactory.java   |  16 +-
 ...tiTransactionJobletEventListenerFactory.java |  16 +-
 .../std/FlushDatasetOperatorDescriptor.java     |  10 +-
 .../asterix/tools/datagen/AdmDataGen.java       |   4 +-
 ...tractIndexModificationOperationCallback.java |   2 +-
 .../LockThenSearchOperationCallback.java        |   2 +-
 .../LockThenSearchOperationCallbackFactory.java |   8 +-
 ...exInstantSearchOperationCallbackFactory.java |   8 +-
 ...dexModificationOperationCallbackFactory.java |   8 +-
 ...maryIndexSearchOperationCallbackFactory.java |   8 +-
 ...dexModificationOperationCallbackFactory.java |   8 +-
 ...dexModificationOperationCallbackFactory.java |   8 +-
 ...dexModificationOperationCallbackFactory.java |   8 +-
 .../UpsertOperationCallbackFactory.java         |   8 +-
 .../management/runtime/CommitRuntime.java       |  10 +-
 .../runtime/CommitRuntimeFactory.java           |  10 +-
 .../service/locking/ConcurrentLockManager.java  |  64 +--
 .../service/locking/DumpTablePrinter.java       |  12 +-
 .../management/service/locking/Job.json         |   4 +-
 .../service/locking/ResourceTablePrinter.java   |   2 +-
 .../management/service/logging/LogBuffer.java   |  25 +-
 .../management/service/logging/LogManager.java  |   2 +-
 .../logging/LogManagerWithReplication.java      |  10 +-
 .../recovery/AbstractCheckpointManager.java     |   2 +-
 .../service/recovery/TxnEntityId.java           | 175 +++++++
 .../management/service/recovery/TxnId.java      | 175 -------
 .../service/transaction/JobIdFactory.java       |  38 --
 .../service/transaction/TransactionContext.java |  20 +-
 .../service/transaction/TransactionManager.java |  56 +--
 .../service/transaction/TxnIdFactory.java       |  42 ++
 .../service/locking/LockManagerUnitTest.java    |   8 +-
 .../management/service/locking/Locker.java      |   2 +-
 .../management/service/locking/Request.java     |   4 +-
 77 files changed, 1178 insertions(+), 1181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
index 2dd4c3d..abd18aa 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -21,7 +21,7 @@ package org.apache.asterix.algebra.operators.physical;
 
 import java.util.List;
 
-import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -39,17 +39,16 @@ import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.io.FileSplit;
 
 public class CommitPOperator extends AbstractPhysicalOperator {
 
     private final List<LogicalVariable> primaryKeyLogicalVars;
-    private final JobId jobId;
+    private final TxnId txnId;
     private final Dataset dataset;
     private final boolean isSink;
 
-    public CommitPOperator(JobId jobId, Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars, boolean isSink) {
-        this.jobId = jobId;
+    public CommitPOperator(TxnId txnId, Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars, boolean isSink) {
+        this.txnId = txnId;
         this.dataset = dataset;
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
         this.isSink = isSink;
@@ -88,7 +87,7 @@ public class CommitPOperator extends AbstractPhysicalOperator {
         int[] primaryKeyFields = JobGenHelper.variablesToFieldIndexes(primaryKeyLogicalVars, inputSchemas[0]);
 
         //get dataset splits
-        IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(metadataProvider, jobId, primaryKeyFields,
+        IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(metadataProvider, txnId, primaryKeyFields,
                 isSink);
         builder.contributeMicroOperator(op, runtime, recDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 12114f0..c941320 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.algebra.operators.physical;
 
-import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.declared.DataSourceId;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -159,7 +158,7 @@ public class InvertedIndexPOperator extends IndexSearchPOperator {
                 jobSpec, outputRecDesc, queryField, dataflowHelperFactory, queryTokenizerFactory, searchModifierFactory,
                 retainInput, retainMissing, context.getMissingWriterFactory(),
                 dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(), secondaryIndex,
-                        ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId(),
+                        ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getTxnId(),
                         IndexOperation.SEARCH, null),
                 minFilterFieldIndexes, maxFilterFieldIndexes, isFullTextSearchQuery, numPrimaryKeys,
                 propagateIndexFilter);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
index 18e5f5e..61339bf 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
@@ -23,7 +23,7 @@ import java.util.List;
 
 import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.algebra.operators.physical.CommitPOperator;
-import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -99,14 +99,14 @@ public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule {
             primaryKeyLogicalVars.add(new LogicalVariable(varRefExpr.getVariableReference().getId()));
         }
 
-        //get JobId(TransactorId)
+        //get TxnId(TransactorId)
         MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
-        JobId jobId = mp.getJobId();
+        TxnId txnId = mp.getTxnId();
 
         //create the logical and physical operator
         CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, isSink);
         CommitPOperator commitPOperator =
-                new CommitPOperator(jobId, dataset, primaryKeyLogicalVars, isSink);
+                new CommitPOperator(txnId, dataset, primaryKeyLogicalVars, isSink);
         commitOperator.setPhysicalOperator(commitPOperator);
 
         //create ExtensionOperator and put the commitOperator in it.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 2078288..f7ecefc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -38,6 +38,7 @@ import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.common.utils.Job;
 import org.apache.asterix.common.utils.Job.SubmissionMode;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
@@ -63,7 +64,7 @@ import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.optimizer.base.FuzzyUtils;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
-import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
+import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.asterix.translator.SessionConfig;
@@ -211,8 +212,8 @@ public class APIFramework {
             printPlanPostfix(output);
         }
 
-        org.apache.asterix.common.transactions.JobId asterixJobId = JobIdFactory.generateJobId();
-        metadataProvider.setJobId(asterixJobId);
+        TxnId txnId = TxnIdFactory.create();
+        metadataProvider.setTxnId(txnId);
         ILangExpressionToPlanTranslator t =
                 translatorFactory.createExpressionToPlanTranslator(metadataProvider, varCounter);
 
@@ -351,7 +352,7 @@ public class APIFramework {
         builder.setNormalizedKeyComputerFactoryProvider(format.getNormalizedKeyComputerFactoryProvider());
 
         JobEventListenerFactory jobEventListenerFactory =
-                new JobEventListenerFactory(asterixJobId, metadataProvider.isWriteTransaction());
+                new JobEventListenerFactory(txnId, metadataProvider.isWriteTransaction());
         JobSpecification spec = compiler.createJob(metadataProvider.getApplicationContext(), jobEventListenerFactory);
 
         // When the top-level statement is a query, the statement parameter is null.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 25385c6..2435b60 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -61,7 +61,7 @@ import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModifi
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
 import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager;
-import org.apache.asterix.transaction.management.service.recovery.TxnId;
+import org.apache.asterix.transaction.management.service.recovery.TxnEntityId;
 import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.application.INCServiceContext;
@@ -87,7 +87,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
     private final LogManager logMgr;
     private final boolean replicationEnabled;
     private static final String RECOVERY_FILES_DIR_NAME = "recovery_temp";
-    private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null;
+    private Map<Long, JobEntityCommits> jobId2WinnerEntitiesMap = null;
     private final long cachedEntityCommitsPerJobSize;
     private final PersistentLocalResourceRepository localResourceRepository;
     private final ICheckpointManager checkpointManager;
@@ -183,7 +183,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
     public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN)
             throws IOException, ACIDException {
         try {
-            Set<Integer> winnerJobSet = startRecoverysAnalysisPhase(partitions, logReader, lowWaterMarkLSN);
+            Set<Long> winnerJobSet = startRecoverysAnalysisPhase(partitions, logReader, lowWaterMarkLSN);
             startRecoveryRedoPhase(partitions, logReader, lowWaterMarkLSN, winnerJobSet);
         } finally {
             logReader.close();
@@ -191,13 +191,13 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         }
     }
 
-    private synchronized Set<Integer> startRecoverysAnalysisPhase(Set<Integer> partitions, ILogReader logReader,
+    private synchronized Set<Long> startRecoverysAnalysisPhase(Set<Integer> partitions, ILogReader logReader,
             long lowWaterMarkLSN) throws IOException, ACIDException {
         int updateLogCount = 0;
         int entityCommitLogCount = 0;
         int jobCommitLogCount = 0;
         int abortLogCount = 0;
-        Set<Integer> winnerJobSet = new HashSet<>();
+        Set<Long> winnerJobSet = new HashSet<>();
         jobId2WinnerEntitiesMap = new HashMap<>();
         //set log reader to the lowWaterMarkLsn
         ILogRecord logRecord;
@@ -214,8 +214,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                     }
                     break;
                 case LogType.JOB_COMMIT:
-                    winnerJobSet.add(logRecord.getJobId());
-                    cleanupJobCommits(logRecord.getJobId());
+                    winnerJobSet.add(logRecord.getTxnId());
+                    cleanupTxnCommits(logRecord.getTxnId());
                     jobCommitLogCount++;
                     break;
                 case LogType.ENTITY_COMMIT:
@@ -249,38 +249,38 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         return winnerJobSet;
     }
 
-    private void cleanupJobCommits(int jobId) {
-        if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
-            JobEntityCommits jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
+    private void cleanupTxnCommits(long txnId) {
+        if (jobId2WinnerEntitiesMap.containsKey(txnId)) {
+            JobEntityCommits jobEntityWinners = jobId2WinnerEntitiesMap.get(txnId);
             //to delete any spilled files as well
             jobEntityWinners.clear();
-            jobId2WinnerEntitiesMap.remove(jobId);
+            jobId2WinnerEntitiesMap.remove(txnId);
         }
     }
 
     private void analyzeEntityCommitLog(ILogRecord logRecord) throws IOException {
-        int jobId = logRecord.getJobId();
+        long txnId = logRecord.getTxnId();
         JobEntityCommits jobEntityWinners;
-        if (!jobId2WinnerEntitiesMap.containsKey(jobId)) {
-            jobEntityWinners = new JobEntityCommits(jobId);
+        if (!jobId2WinnerEntitiesMap.containsKey(txnId)) {
+            jobEntityWinners = new JobEntityCommits(txnId);
             if (needToFreeMemory()) {
                 // If we don't have enough memory for one more job,
                 // we will force all jobs to spill their cached entities to disk.
                 // This could happen only when we have many jobs with small
                 // number of records and none of them have job commit.
-                freeJobsCachedEntities(jobId);
+                freeJobsCachedEntities(txnId);
             }
-            jobId2WinnerEntitiesMap.put(jobId, jobEntityWinners);
+            jobId2WinnerEntitiesMap.put(txnId, jobEntityWinners);
         } else {
-            jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
+            jobEntityWinners = jobId2WinnerEntitiesMap.get(txnId);
         }
         jobEntityWinners.add(logRecord);
     }
 
     private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, ILogReader logReader,
-            long lowWaterMarkLSN, Set<Integer> winnerJobSet) throws IOException, ACIDException {
+            long lowWaterMarkLSN, Set<Long> winnerTxnSet) throws IOException, ACIDException {
         int redoCount = 0;
-        int jobId = -1;
+        long jobId;
 
         long resourceId;
         long maxDiskLastLsn;
@@ -296,7 +296,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
 
         Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources();
         Map<Long, Long> resourceId2MaxLSNMap = new HashMap<>();
-        TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
+        TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, false);
 
         ILogRecord logRecord = null;
         try {
@@ -307,18 +307,18 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                     LOGGER.info(logRecord.getLogRecordForDisplay());
                 }
                 lsn = logRecord.getLSN();
-                jobId = logRecord.getJobId();
+                jobId = logRecord.getTxnId();
                 foundWinner = false;
                 switch (logRecord.getLogType()) {
                     case LogType.UPDATE:
                         if (partitions.contains(logRecord.getResourcePartition())) {
-                            if (winnerJobSet.contains(jobId)) {
+                            if (winnerTxnSet.contains(jobId)) {
                                 foundWinner = true;
                             } else if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
                                 jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
-                                tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+                                tempKeyTxnEntityId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
                                         logRecord.getPKValue(), logRecord.getPKValueSize());
-                                if (jobEntityWinners.containsEntityCommitForTxnId(lsn, tempKeyTxnId)) {
+                                if (jobEntityWinners.containsEntityCommitForTxnId(lsn, tempKeyTxnEntityId)) {
                                     foundWinner = true;
                                 }
                             }
@@ -449,9 +449,9 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
     }
 
     @Override
-    public File createJobRecoveryFile(int jobId, String fileName) throws IOException {
+    public File createJobRecoveryFile(long txnId, String fileName) throws IOException {
         String recoveryDirPath = getRecoveryDirPath();
-        Path jobRecoveryFolder = Paths.get(recoveryDirPath + File.separator + jobId);
+        Path jobRecoveryFolder = Paths.get(recoveryDirPath + File.separator + txnId);
         if (!Files.exists(jobRecoveryFolder)) {
             Files.createDirectories(jobRecoveryFolder);
         }
@@ -459,10 +459,10 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         File jobRecoveryFile = new File(jobRecoveryFolder.toString() + File.separator + fileName);
         if (!jobRecoveryFile.exists()) {
             if (!jobRecoveryFile.createNewFile()) {
-                throw new IOException("Failed to create file: " + fileName + " for job id(" + jobId + ")");
+                throw new IOException("Failed to create file: " + fileName + " for txn id(" + txnId + ")");
             }
         } else {
-            throw new IOException("File: " + fileName + " for job id(" + jobId + ") already exists");
+            throw new IOException("File: " + fileName + " for txn id(" + txnId + ") already exists");
         }
         return jobRecoveryFile;
     }
@@ -483,11 +483,11 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         return logDir + RECOVERY_FILES_DIR_NAME;
     }
 
-    private void freeJobsCachedEntities(int requestingJobId) throws IOException {
+    private void freeJobsCachedEntities(long requestingTxnId) throws IOException {
         if (jobId2WinnerEntitiesMap != null) {
-            for (Entry<Integer, JobEntityCommits> jobEntityCommits : jobId2WinnerEntitiesMap.entrySet()) {
+            for (Entry<Long, JobEntityCommits> jobEntityCommits : jobId2WinnerEntitiesMap.entrySet()) {
                 //if the job is not the requester, free its memory
-                if (jobEntityCommits.getKey() != requestingJobId) {
+                if (jobEntityCommits.getKey() != requestingTxnId) {
                     jobEntityCommits.getValue().spillToDiskAndfreeMemory();
                 }
             }
@@ -496,7 +496,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
 
     @Override
     public void rollbackTransaction(ITransactionContext txnContext) throws ACIDException {
-        int abortedJobId = txnContext.getJobId().getId();
+        long abortedTxnId = txnContext.getTxnId().getId();
         // Obtain the first/last log record LSNs written by the Job
         long firstLSN = txnContext.getFirstLSN();
         /*
@@ -517,7 +517,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         // check if the transaction actually wrote some logs.
         if (firstLSN == TransactionManagementConstants.LogManagerConstants.TERMINAL_LSN || firstLSN > lastLSN) {
             if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("no need to roll back as there were no operations by the job " + txnContext.getJobId());
+                LOGGER.info("no need to roll back as there were no operations by the txn " + txnContext.getTxnId());
             }
             return;
         }
@@ -527,13 +527,13 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
             LOGGER.info("collecting loser transaction's LSNs from " + firstLSN + " to " + lastLSN);
         }
 
-        Map<TxnId, List<Long>> jobLoserEntity2LSNsMap = new HashMap<>();
-        TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
+        Map<TxnEntityId, List<Long>> jobLoserEntity2LSNsMap = new HashMap<>();
+        TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, false);
         int updateLogCount = 0;
         int entityCommitLogCount = 0;
-        int logJobId = -1;
+        long logTxnId;
         long currentLSN = -1;
-        TxnId loserEntity = null;
+        TxnEntityId loserEntity;
         List<Long> undoLSNSet = null;
         //get active partitions on this node
         Set<Integer> activePartitions = localResourceRepository.getActivePartitions();
@@ -552,19 +552,20 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                         LOGGER.info(logRecord.getLogRecordForDisplay());
                     }
                 }
-                logJobId = logRecord.getJobId();
-                if (logJobId != abortedJobId) {
+                logTxnId = logRecord.getTxnId();
+                if (logTxnId != abortedTxnId) {
                     continue;
                 }
-                tempKeyTxnId.setTxnId(logJobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+                tempKeyTxnEntityId.setTxnId(logTxnId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
                         logRecord.getPKValue(), logRecord.getPKValueSize());
                 switch (logRecord.getLogType()) {
                     case LogType.UPDATE:
                         if (activePartitions.contains(logRecord.getResourcePartition())) {
-                            undoLSNSet = jobLoserEntity2LSNsMap.get(tempKeyTxnId);
+                            undoLSNSet = jobLoserEntity2LSNsMap.get(tempKeyTxnEntityId);
                             if (undoLSNSet == null) {
-                                loserEntity = new TxnId(logJobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
-                                        logRecord.getPKValue(), logRecord.getPKValueSize(), true);
+                                loserEntity =
+                                        new TxnEntityId(logTxnId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+                                                logRecord.getPKValue(), logRecord.getPKValueSize(), true);
                                 undoLSNSet = new LinkedList<>();
                                 jobLoserEntity2LSNsMap.put(loserEntity, undoLSNSet);
                             }
@@ -572,17 +573,17 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                             updateLogCount++;
                             if (IS_DEBUG_MODE) {
                                 LOGGER.info(Thread.currentThread().getId() + "======> update[" + currentLSN + "]:"
-                                        + tempKeyTxnId);
+                                        + tempKeyTxnEntityId);
                             }
                         }
                         break;
                     case LogType.ENTITY_COMMIT:
                         if (activePartitions.contains(logRecord.getResourcePartition())) {
-                            jobLoserEntity2LSNsMap.remove(tempKeyTxnId);
+                            jobLoserEntity2LSNsMap.remove(tempKeyTxnEntityId);
                             entityCommitLogCount++;
                             if (IS_DEBUG_MODE) {
                                 LOGGER.info(Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
-                                        + tempKeyTxnId);
+                                        + tempKeyTxnEntityId);
                             }
                         }
                         break;
@@ -601,7 +602,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
 
             if (currentLSN != lastLSN) {
                 throw new ACIDException("LastLSN mismatch: lastLSN(" + lastLSN + ") vs currentLSN(" + currentLSN
-                        + ") during abort( " + txnContext.getJobId() + ")");
+                        + ") during abort( " + txnContext.getTxnId() + ")");
             }
 
             //undo loserTxn's effect
@@ -610,10 +611,10 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
             IDatasetLifecycleManager datasetLifecycleManager =
                     txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
             //TODO sort loser entities by smallest LSN to undo in one pass.
-            Iterator<Entry<TxnId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator();
+            Iterator<Entry<TxnEntityId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator();
             int undoCount = 0;
             while (iter.hasNext()) {
-                Map.Entry<TxnId, List<Long>> loserEntity2LSNsMap = iter.next();
+                Map.Entry<TxnEntityId, List<Long>> loserEntity2LSNsMap = iter.next();
                 undoLSNSet = loserEntity2LSNsMap.getValue();
                 // The step below is important since the upsert operations must be done in reverse order.
                 Collections.reverse(undoLSNSet);
@@ -622,7 +623,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                     //read the corresponding log record to be undone.
                     logRecord = logReader.read(undoLSN);
                     if (logRecord == null) {
-                        throw new ACIDException("IllegalState exception during abort( " + txnContext.getJobId() + ")");
+                        throw new ACIDException("IllegalState exception during abort( " + txnContext.getTxnId() + ")");
                     }
                     if (IS_DEBUG_MODE) {
                         LOGGER.info(logRecord.getLogRecordForDisplay());
@@ -713,25 +714,25 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
 
     private class JobEntityCommits {
         private static final String PARTITION_FILE_NAME_SEPARATOR = "_";
-        private final int jobId;
-        private final Set<TxnId> cachedEntityCommitTxns = new HashSet<>();
+        private final long txnId;
+        private final Set<TxnEntityId> cachedEntityCommitTxns = new HashSet<>();
         private final List<File> jobEntitCommitOnDiskPartitionsFiles = new ArrayList<>();
         //a flag indicating whether all the the commits for this jobs have been added.
         private boolean preparedForSearch = false;
-        private TxnId winnerEntity = null;
+        private TxnEntityId winnerEntity = null;
         private int currentPartitionSize = 0;
         private long partitionMaxLSN = 0;
         private String currentPartitonName;
 
-        public JobEntityCommits(int jobId) {
-            this.jobId = jobId;
+        public JobEntityCommits(long txnId) {
+            this.txnId = txnId;
         }
 
         public void add(ILogRecord logRecord) throws IOException {
             if (preparedForSearch) {
                 throw new IOException("Cannot add new entity commits after preparing for search.");
             }
-            winnerEntity = new TxnId(logRecord.getJobId(), logRecord.getDatasetId(), logRecord.getPKHashValue(),
+            winnerEntity = new TxnEntityId(logRecord.getTxnId(), logRecord.getDatasetId(), logRecord.getPKHashValue(),
                     logRecord.getPKValue(), logRecord.getPKValueSize(), true);
             cachedEntityCommitTxns.add(winnerEntity);
             //since log file is read sequentially, LSNs are always increasing
@@ -772,15 +773,15 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
             preparedForSearch = true;
         }
 
-        public boolean containsEntityCommitForTxnId(long logLSN, TxnId txnId) throws IOException {
+        public boolean containsEntityCommitForTxnId(long logLSN, TxnEntityId txnEntityId) throws IOException {
             //if we don't have any partitions on disk, search only from memory
             if (jobEntitCommitOnDiskPartitionsFiles.size() == 0) {
-                return cachedEntityCommitTxns.contains(txnId);
+                return cachedEntityCommitTxns.contains(txnEntityId);
             } else {
                 //get candidate partitions from disk
                 ArrayList<File> candidatePartitions = getCandidiatePartitions(logLSN);
                 for (File partition : candidatePartitions) {
-                    if (serachPartition(partition, txnId)) {
+                    if (serachPartition(partition, txnEntityId)) {
                         return true;
                     }
                 }
@@ -814,17 +815,17 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
             jobEntitCommitOnDiskPartitionsFiles.clear();
         }
 
-        private boolean serachPartition(File partition, TxnId txnId) throws IOException {
+        private boolean serachPartition(File partition, TxnEntityId txnEntityId) throws IOException {
             //load partition from disk if it is not  already in memory
             if (!partition.getName().equals(currentPartitonName)) {
                 loadPartitionToMemory(partition, cachedEntityCommitTxns);
                 currentPartitonName = partition.getName();
             }
-            return cachedEntityCommitTxns.contains(txnId);
+            return cachedEntityCommitTxns.contains(txnEntityId);
         }
 
         private String getPartitionName(long maxLSN) {
-            return jobId + PARTITION_FILE_NAME_SEPARATOR + maxLSN;
+            return txnId + PARTITION_FILE_NAME_SEPARATOR + maxLSN;
         }
 
         private long getPartitionMaxLSNFromName(String partitionName) {
@@ -835,18 +836,18 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
             //if we don't have enough memory to allocate for this partition,
             // we will ask recovery manager to free memory
             if (needToFreeMemory()) {
-                freeJobsCachedEntities(jobId);
+                freeJobsCachedEntities(txnId);
             }
             //allocate a buffer that can hold the current partition
             ByteBuffer buffer = ByteBuffer.allocate(currentPartitionSize);
-            for (Iterator<TxnId> iterator = cachedEntityCommitTxns.iterator(); iterator.hasNext();) {
-                TxnId txnId = iterator.next();
+            for (Iterator<TxnEntityId> iterator = cachedEntityCommitTxns.iterator(); iterator.hasNext();) {
+                TxnEntityId txnEntityId = iterator.next();
                 //serialize the object and remove it from memory
-                txnId.serialize(buffer);
+                txnEntityId.serialize(buffer);
                 iterator.remove();
             }
             //name partition file based on job id and max lsn
-            File partitionFile = createJobRecoveryFile(jobId, getPartitionName(partitionMaxLSN));
+            File partitionFile = createJobRecoveryFile(txnId, getPartitionName(partitionMaxLSN));
             //write file to disk
             try (FileOutputStream fileOutputstream = new FileOutputStream(partitionFile, false);
                     FileChannel fileChannel = fileOutputstream.getChannel()) {
@@ -858,11 +859,11 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
             jobEntitCommitOnDiskPartitionsFiles.add(partitionFile);
         }
 
-        private void loadPartitionToMemory(File partition, Set<TxnId> partitionTxn) throws IOException {
+        private void loadPartitionToMemory(File partition, Set<TxnEntityId> partitionTxn) throws IOException {
             partitionTxn.clear();
             //if we don't have enough memory to a load partition, we will ask recovery manager to free memory
             if (needToFreeMemory()) {
-                freeJobsCachedEntities(jobId);
+                freeJobsCachedEntities(txnId);
             }
             ByteBuffer buffer = ByteBuffer.allocateDirect((int) partition.length());
             //load partition to memory
@@ -873,9 +874,9 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                 }
             }
             buffer.flip();
-            TxnId temp = null;
+            TxnEntityId temp;
             while (buffer.remaining() != 0) {
-                temp = TxnId.deserialize(buffer);
+                temp = TxnEntityId.deserialize(buffer);
                 partitionTxn.add(temp);
             }
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 608769b..e42b5e5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -42,7 +42,7 @@ import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
 import org.apache.asterix.external.api.IAdapterFactory;
@@ -279,7 +279,7 @@ public class FeedOperations {
         Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorIdMapping = new HashMap<>();
         Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations = new HashMap<>();
         Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>();
-        List<JobId> jobIds = new ArrayList<>();
+        List<TxnId> txnIds = new ArrayList<>();
         FeedMetaOperatorDescriptor metaOp;
 
         for (int iter1 = 0; iter1 < jobsList.size(); iter1++) {
@@ -415,11 +415,11 @@ public class FeedOperations {
             for (OperatorDescriptorId root : subJob.getRoots()) {
                 jobSpec.addRoot(jobSpec.getOperatorMap().get(operatorIdMapping.get(root)));
             }
-            jobIds.add(((JobEventListenerFactory) subJob.getJobletEventListenerFactory()).getJobId());
+            txnIds.add(((JobEventListenerFactory) subJob.getJobletEventListenerFactory()).getTxnId());
         }
 
         // jobEventListenerFactory
-        jobSpec.setJobletEventListenerFactory(new MultiTransactionJobletEventListenerFactory(jobIds, true));
+        jobSpec.setJobletEventListenerFactory(new MultiTransactionJobletEventListenerFactory(txnIds, true));
         // useConnectorSchedulingPolicy
         jobSpec.setUseConnectorPolicyForScheduling(jobsList.get(0).isUseConnectorPolicyForScheduling());
         // connectorAssignmentPolicy

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
index 958444c..4137fbd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
@@ -20,12 +20,13 @@
 package org.apache.asterix.utils;
 
 import org.apache.asterix.common.config.CompilerProperties;
+import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.common.utils.JobUtils;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
-import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
+import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -59,8 +60,8 @@ public class FlushDatasetUtil {
         AlgebricksMetaOperatorDescriptor emptySource = new AlgebricksMetaOperatorDescriptor(spec, 0, 1,
                 new IPushRuntimeFactory[] { new EmptyTupleSourceRuntimeFactory() }, rDescs);
 
-        org.apache.asterix.common.transactions.JobId jobId = JobIdFactory.generateJobId();
-        FlushDatasetOperatorDescriptor flushOperator = new FlushDatasetOperatorDescriptor(spec, jobId,
+        TxnId txnId = TxnIdFactory.create();
+        FlushDatasetOperatorDescriptor flushOperator = new FlushDatasetOperatorDescriptor(spec, txnId,
                 dataset.getDatasetId());
 
         spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0);
@@ -72,7 +73,7 @@ public class FlushDatasetUtil {
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource,
                 primaryPartitionConstraint);
 
-        JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, true);
+        JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(txnId, true);
         spec.setJobletEventListenerFactory(jobEventListenerFactory);
         JobUtils.runJob(hcc, spec, true);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 9e34d70..42f577f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -34,7 +34,7 @@ import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.common.utils.JobUtils;
 import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.metadata.MetadataManager;
@@ -47,7 +47,7 @@ import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.metadata.utils.IndexUtil;
 import org.apache.asterix.rebalance.IDatasetRebalanceCallback;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
-import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
+import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -275,21 +275,21 @@ public class RebalanceUtil {
     private static void populateDataToRebalanceTarget(Dataset source, Dataset target, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc) throws Exception {
         JobSpecification spec = new JobSpecification();
-        JobId jobId = JobIdFactory.generateJobId();
-        JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, true);
+        TxnId txnId = TxnIdFactory.create();
+        JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(txnId, true);
         spec.setJobletEventListenerFactory(jobEventListenerFactory);
 
         // The pipeline starter.
         IOperatorDescriptor starter = DatasetUtil.createDummyKeyProviderOp(spec, source, metadataProvider);
 
         // Creates primary index scan op.
-        IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, source, jobId);
+        IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, source, txnId);
 
         // Creates secondary BTree upsert op.
         IOperatorDescriptor upsertOp = createPrimaryIndexUpsertOp(spec, metadataProvider, source, target);
 
         // The final commit operator.
-        IOperatorDescriptor commitOp = createUpsertCommitOp(spec, metadataProvider, jobId, target);
+        IOperatorDescriptor commitOp = createUpsertCommitOp(spec, metadataProvider, txnId, target);
 
         // Connects empty-tuple-source and scan.
         spec.connect(new OneToOneConnectorDescriptor(spec), starter, 0, primaryScanOp, 0);
@@ -326,11 +326,11 @@ public class RebalanceUtil {
 
     // Creates the commit operator for populating the target dataset.
     private static IOperatorDescriptor createUpsertCommitOp(JobSpecification spec, MetadataProvider metadataProvider,
-            JobId jobId, Dataset target) throws AlgebricksException {
+            TxnId txnId, Dataset target) throws AlgebricksException {
         int[] primaryKeyFields = getPrimaryKeyPermutationForUpsert(target);
         return new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
                 new IPushRuntimeFactory[] {
-                        target.getCommitRuntimeFactory(metadataProvider, jobId, primaryKeyFields, true) },
+                        target.getCommitRuntimeFactory(metadataProvider, txnId, primaryKeyFields, true) },
                 new RecordDescriptor[] { target.getPrimaryRecordDescriptor(metadataProvider) });
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 1810517..df8eef7 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -40,6 +40,7 @@ import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -152,8 +153,8 @@ public class TestNodeController {
         ExecutionTestUtil.tearDown(cleanupOnStop);
     }
 
-    public org.apache.asterix.common.transactions.JobId getTxnJobId(IHyracksTaskContext ctx) {
-        return new org.apache.asterix.common.transactions.JobId((int) ctx.getJobletContext().getJobId().getId());
+    public TxnId getTxnJobId(IHyracksTaskContext ctx) {
+        return new TxnId(ctx.getJobletContext().getJobId().getId());
     }
 
     public Pair<LSMInsertDeleteOperatorNodePushable, CommitRuntime> getInsertPipeline(IHyracksTaskContext ctx,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
index d1d869e..d2b1276 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
@@ -26,15 +26,15 @@ import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 public abstract class AbstractOperationCallbackFactory implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    protected final JobId jobId;
+    protected final TxnId txnId;
     protected final int datasetId;
     protected final int[] primaryKeyFields;
     protected final ITransactionSubsystemProvider txnSubsystemProvider;
     protected final byte resourceType;
 
-    public AbstractOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
+    public AbstractOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields,
             ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) {
-        this.jobId = jobId;
+        this.txnId = txnId;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
         this.txnSubsystemProvider = txnSubsystemProvider;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
index a4c41df..cb278a7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
@@ -31,19 +31,19 @@ public class Checkpoint implements Comparable<Checkpoint> {
 
     private final long checkpointLsn;
     private final long minMCTFirstLsn;
-    private final int maxJobId;
+    private final long maxTxnId;
     private final long timeStamp;
     private final boolean sharp;
     private final int storageVersion;
 
     @JsonCreator
     public Checkpoint(@JsonProperty("checkpointLsn") long checkpointLsn,
-            @JsonProperty("minMCTFirstLsn") long minMCTFirstLsn, @JsonProperty("maxJobId") int maxJobId,
+            @JsonProperty("minMCTFirstLsn") long minMCTFirstLsn, @JsonProperty("maxJobId") long maxTxnId,
             @JsonProperty("timeStamp") long timeStamp, @JsonProperty("sharp") boolean sharp,
             @JsonProperty("storageVersion") int storageVersion) {
         this.checkpointLsn = checkpointLsn;
         this.minMCTFirstLsn = minMCTFirstLsn;
-        this.maxJobId = maxJobId;
+        this.maxTxnId = maxTxnId;
         this.timeStamp = timeStamp;
         this.sharp = sharp;
         this.storageVersion = storageVersion;
@@ -57,8 +57,8 @@ public class Checkpoint implements Comparable<Checkpoint> {
         return minMCTFirstLsn;
     }
 
-    public int getMaxJobId() {
-        return maxJobId;
+    public long getMaxJobId() {
+        return maxTxnId;
     }
 
     public long getTimeStamp() {
@@ -108,7 +108,7 @@ public class Checkpoint implements Comparable<Checkpoint> {
         final int prime = 31;
         int result = 1;
         result = prime * result + (int) (checkpointLsn ^ (checkpointLsn >>> 32));
-        result = prime * result + maxJobId;
+        result = prime * result + Long.hashCode(maxTxnId);
         result = prime * result + (int) (minMCTFirstLsn ^ (minMCTFirstLsn >>> 32));
         result = prime * result + (sharp ? 1231 : 1237);
         result = prime * result + storageVersion;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 6ee0980..4090b65 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -32,36 +32,33 @@ public interface ILogRecord {
         LARGE_RECORD
     }
 
-    public static final int CHKSUM_LEN = Long.BYTES;
-    public static final int FLDCNT_LEN = Integer.BYTES;
-    public static final int DS_LEN = Integer.BYTES;
-    public static final int LOG_SOURCE_LEN = Byte.BYTES;
-    public static final int LOGRCD_SZ_LEN = Integer.BYTES;
-    public static final int NEWOP_LEN = Byte.BYTES;
-    public static final int NEWVALSZ_LEN = Integer.BYTES;
-    public static final int PKHASH_LEN = Integer.BYTES;
-    public static final int PKSZ_LEN = Integer.BYTES;
-    public static final int PRVLSN_LEN = Long.BYTES;
-    public static final int RS_PARTITION_LEN = Integer.BYTES;
-    public static final int RSID_LEN = Long.BYTES;
-    public static final int SEQ_NUM_LEN = Long.BYTES;
-    public static final int TYPE_LEN = Byte.BYTES;
-    public static final int UUID_LEN = Long.BYTES;
-
-    public static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
-    public static final int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN;
-    public static final int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
-    public static final int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
-    // What are these fields? vvvvv
-    public static final int REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN = Long.BYTES + Integer.BYTES + Integer.BYTES;
-
-    // How are the following computed?
-    public static final int JOB_TERMINATE_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
-    public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 30; // ALL_RECORD_HEADER_LEN + CHKSUM_LEN +?
-    public static final int UPDATE_LOG_BASE_SIZE = 51; // ALL_RECORD_HEADER_LEN + CHKSUM_LEN +?
-    public static final int FLUSH_LOG_SIZE = 18; // ALL_RECORD_HEADER_LEN + CHKSUM_LEN +?
-    public static final int WAIT_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
-    public static final int MARKER_BASE_LOG_SIZE =
+    int CHKSUM_LEN = Long.BYTES;
+    int FLDCNT_LEN = Integer.BYTES;
+    int DS_LEN = Integer.BYTES;
+    int LOG_SOURCE_LEN = Byte.BYTES;
+    int LOGRCD_SZ_LEN = Integer.BYTES;
+    int NEWOP_LEN = Byte.BYTES;
+    int NEWVALSZ_LEN = Integer.BYTES;
+    int PKHASH_LEN = Integer.BYTES;
+    int PKSZ_LEN = Integer.BYTES;
+    int PRVLSN_LEN = Long.BYTES;
+    int RS_PARTITION_LEN = Integer.BYTES;
+    int RSID_LEN = Long.BYTES;
+    int SEQ_NUM_LEN = Long.BYTES;
+    int TYPE_LEN = Byte.BYTES;
+    int UUID_LEN = Long.BYTES;
+
+    int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + TxnId.BYTES;
+    int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN;
+    int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
+    int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
+
+    int JOB_TERMINATE_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
+    int ENTITY_COMMIT_LOG_BASE_SIZE = ALL_RECORD_HEADER_LEN + ENTITYCOMMIT_UPDATE_HEADER_LEN + CHKSUM_LEN;
+    int UPDATE_LOG_BASE_SIZE = ENTITY_COMMIT_LOG_BASE_SIZE + UPDATE_LSN_HEADER + UPDATE_BODY_HEADER;
+    int FLUSH_LOG_SIZE = ALL_RECORD_HEADER_LEN + DatasetId.BYTES + CHKSUM_LEN;
+    int WAIT_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
+    int MARKER_BASE_LOG_SIZE =
             ALL_RECORD_HEADER_LEN + CHKSUM_LEN + DS_LEN + RS_PARTITION_LEN + PRVLSN_LEN + LOGRCD_SZ_LEN;
 
     public RecordReadStatus readLogRecord(ByteBuffer buffer);
@@ -80,9 +77,9 @@ public interface ILogRecord {
 
     public void setLogType(byte logType);
 
-    public int getJobId();
+    long getTxnId();
 
-    public void setJobId(int jobId);
+    void setTxnId(long jobId);
 
     public int getDatasetId();
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index 7965aa5..dea7a67 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -98,13 +98,13 @@ public interface IRecoveryManager {
     /**
      * Creates a temporary file to be used during recovery
      *
-     * @param jobId
+     * @param txnId
      * @param fileName
      * @return A file to the created temporary file
      * @throws IOException
-     *             if the file for the specified {@code jobId} with the {@code fileName} already exists
+     *             if the file for the specified {@code txnId} with the {@code fileName} already exists
      */
-    File createJobRecoveryFile(int jobId, String fileName) throws IOException;
+    File createJobRecoveryFile(long txnId, String fileName) throws IOException;
 
     /**
      * Deletes all temporary recovery files

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
index f9d924f..3dda5d3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
@@ -25,7 +25,7 @@ public interface ITransactionContext {
     public void registerIndexAndCallback(long resourceId, ILSMIndex index, AbstractOperationCallback callback,
             boolean isPrimaryIndex);
 
-    public JobId getJobId();
+    public TxnId getTxnId();
 
     public void setTimeout(boolean isTimeout);
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
index 0123814..77c6a9f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
@@ -42,26 +42,26 @@ public interface ITransactionManager {
      * Begins a transaction identified by a transaction id and returns the
      * associated transaction context.
      *
-     * @param jobId
+     * @param txnId
      *            a unique value for the transaction id.
      * @return the transaction context associated with the initiated transaction
      * @see ITransactionContext
      * @throws ACIDException
      */
-    public ITransactionContext beginTransaction(JobId jobId) throws ACIDException;
+    public ITransactionContext beginTransaction(TxnId txnId) throws ACIDException;
 
     /**
      * Returns the transaction context of an active transaction given the
      * transaction id.
      *
-     * @param jobId
+     * @param txnId
      *            a unique value for the transaction id.
      * @param createIfNotExist
      *            TODO
      * @return
      * @throws ACIDException
      */
-    public ITransactionContext getTransactionContext(JobId jobId, boolean createIfNotExist) throws ACIDException;
+    public ITransactionContext getTransactionContext(TxnId txnId, boolean createIfNotExist) throws ACIDException;
 
     /**
      * Commits a transaction.
@@ -73,7 +73,7 @@ public interface ITransactionManager {
      * @param pkHash
      *            TODO
      * @throws ACIDException
-     * @see ITransactionContextimport org.apache.hyracks.api.job.JobId;
+     * @see ITransactionContextimport org.apache.hyracks.api.job.TxnId;
      * @see ACIDException
      */
     public void commitTransaction(ITransactionContext txnContext, DatasetId datasetId, int pkHash)
@@ -125,8 +125,8 @@ public interface ITransactionManager {
     public ITransactionSubsystem getTransactionSubsystem();
 
     /**
-     * @return The current max job id.
+     * @return The current max txn id.
      */
-    int getMaxJobId();
+    long getMaxTxnId();
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java
deleted file mode 100644
index 9654a92..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.transactions;
-
-import java.io.Serializable;
-
-public class JobId implements Serializable {
-    private static final long serialVersionUID = 1L;
-    /**
-     * The number of bytes used to represent {@link JobId} value.
-     */
-    public static final int BYTES = Integer.BYTES;
-
-    private int id;
-
-    public JobId(int id) {
-        this.id = id;
-    }
-
-    public int getId() {
-        return id;
-    }
-
-    @Override
-    public int hashCode() {
-        return id;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == this) {
-            return true;
-        }
-        if (!(o instanceof JobId)) {
-            return false;
-        }
-        return ((JobId) o).id == id;
-    }
-
-    @Override
-    public String toString() {
-        return "JID:" + id;
-    }
-
-    public void setId(int jobId) {
-        id = jobId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index e80cfa6..68afb2a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -32,10 +32,10 @@ import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
 /**
  * == LogRecordFormat ==
  * ---------------------------
- * [Header1] (6 bytes) : for all log types
+ * [Header1] (10 bytes) : for all log types
  * LogSource(1)
  * LogType(1)
- * JobId(4)
+ * TxnId(8)
  * ---------------------------
  * [Header2] (16 bytes + PKValueSize) : for entity_commit, upsert_entity_commit, and update log types
  * DatasetId(4) //stored in dataset_dataset in Metadata Node
@@ -57,16 +57,6 @@ import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
  * [Tail] (8 bytes) : for all log types
  * Checksum(8)
  * ---------------------------
- * = LogSize =
- * 1) JOB_COMMIT_LOG_SIZE: 14 bytes (Header1(6) + Tail(8))
- * 2) ENTITY_COMMIT || UPSERT_ENTITY_COMMIT: (Header1(6) + Header2(16) + Tail(8)) + PKValueSize
- * --> ENTITY_COMMIT_LOG_BASE_SIZE = 30
- * 3) UPDATE: (Header1(6) + Header2(16) + + Header3(20) + Body(9) + Tail(8)) + PKValueSize + NewValueSize
- * --> UPDATE_LOG_BASE_SIZE = 59
- * 4) FLUSH: 18 bytes (Header1(6) + DatasetId(4) + Tail(8))
- * 5) WAIT_LOG_SIZE: 14 bytes (Header1(6) + Tail(8))
- * --> WAIT_LOG only requires LogType Field, but in order to conform the log reader protocol
- * it also includes LogSource and JobId fields.
  */
 
 public class LogRecord implements ILogRecord {
@@ -74,7 +64,7 @@ public class LogRecord implements ILogRecord {
     // ------------- fields in a log record (begin) ------------//
     private byte logSource;
     private byte logType;
-    private int jobId;
+    private long txnId;
     private int datasetId;
     private int PKHashValue;
     private int PKValueSize;
@@ -130,7 +120,7 @@ public class LogRecord implements ILogRecord {
     private void doWriteLogRecord(ByteBuffer buffer) {
         buffer.put(logSource);
         buffer.put(logType);
-        buffer.putInt(jobId);
+        buffer.putLong(txnId);
         switch (logType) {
             case LogType.ENTITY_COMMIT:
                 writeEntityInfo(buffer);
@@ -248,7 +238,7 @@ public class LogRecord implements ILogRecord {
         }
         logSource = buffer.get();
         logType = buffer.get();
-        jobId = buffer.getInt();
+        txnId = buffer.getLong();
         switch (logType) {
             case LogType.FLUSH:
                 if (buffer.remaining() < ILogRecord.DS_LEN) {
@@ -454,7 +444,7 @@ public class LogRecord implements ILogRecord {
         builder.append(" LSN : ").append(LSN);
         builder.append(" LogType : ").append(LogType.toString(logType));
         builder.append(" LogSize : ").append(logSize);
-        builder.append(" JobId : ").append(jobId);
+        builder.append(" TxnId : ").append(txnId);
         if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPDATE) {
             builder.append(" DatasetId : ").append(datasetId);
             builder.append(" ResourcePartition : ").append(resourcePartition);
@@ -503,13 +493,13 @@ public class LogRecord implements ILogRecord {
     }
 
     @Override
-    public int getJobId() {
-        return jobId;
+    public long getTxnId() {
+        return txnId;
     }
 
     @Override
-    public void setJobId(int jobId) {
-        this.jobId = jobId;
+    public void setTxnId(long jobId) {
+        this.txnId = jobId;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnId.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnId.java
new file mode 100644
index 0000000..b0d38b8
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnId.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import java.io.Serializable;
+
+public class TxnId implements Serializable {
+    private static final long serialVersionUID = 1L;
+    /**
+     * The number of bytes used to represent {@link TxnId} value.
+     */
+    public static final int BYTES = Long.BYTES;
+
+    protected long id;
+
+    public TxnId(long id) {
+        this.id = id;
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    @Override
+    public int hashCode() {
+        return Long.hashCode(id);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (!(o instanceof TxnId)) {
+            return false;
+        }
+        return ((TxnId) o).id == id;
+    }
+
+    @Override
+    public String toString() {
+        return "TxnId:" + id;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/592af654/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
index e9f96f9..be4b47f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
@@ -34,21 +34,21 @@ public class TransactionUtil {
 
     public static void formJobTerminateLogRecord(ITransactionContext txnCtx, LogRecord logRecord, boolean isCommit) {
         logRecord.setTxnCtx(txnCtx);
-        TransactionUtil.formJobTerminateLogRecord(logRecord, txnCtx.getJobId().getId(), isCommit);
+        TransactionUtil.formJobTerminateLogRecord(logRecord, txnCtx.getTxnId().getId(), isCommit);
     }
 
-    public static void formJobTerminateLogRecord(LogRecord logRecord, int jobId, boolean isCommit) {
+    public static void formJobTerminateLogRecord(LogRecord logRecord, long txnId, boolean isCommit) {
         logRecord.setLogType(isCommit ? LogType.JOB_COMMIT : LogType.ABORT);
         logRecord.setDatasetId(-1);
         logRecord.setPKHashValue(-1);
-        logRecord.setJobId(jobId);
+        logRecord.setTxnId(txnId);
         logRecord.computeAndSetLogSize();
     }
 
     public static void formFlushLogRecord(LogRecord logRecord, int datasetId, PrimaryIndexOperationTracker opTracker,
             String nodeId, int numberOfIndexes) {
         logRecord.setLogType(LogType.FLUSH);
-        logRecord.setJobId(-1);
+        logRecord.setTxnId(-1);
         logRecord.setDatasetId(datasetId);
         logRecord.setOpTracker(opTracker);
         logRecord.setNumOfFlushedIndexes(numberOfIndexes);
@@ -60,7 +60,7 @@ public class TransactionUtil {
             int PKHashValue, ITupleReference PKValue, int[] PKFields, int resourcePartition, byte entityCommitType) {
         logRecord.setTxnCtx(txnCtx);
         logRecord.setLogType(entityCommitType);
-        logRecord.setJobId(txnCtx.getJobId().getId());
+        logRecord.setTxnId(txnCtx.getTxnId().getId());
         logRecord.setDatasetId(datasetId);
         logRecord.setPKHashValue(PKHashValue);
         logRecord.setPKFieldCnt(PKFields.length);
@@ -76,7 +76,7 @@ public class TransactionUtil {
         logRecord.setTxnCtx(txnCtx);
         logRecord.setLogSource(LogSource.LOCAL);
         logRecord.setLogType(LogType.MARKER);
-        logRecord.setJobId(txnCtx.getJobId().getId());
+        logRecord.setTxnId(txnCtx.getTxnId().getId());
         logRecord.setDatasetId(datasetId);
         logRecord.setResourcePartition(resourcePartition);
         marker.get(); // read the first byte since it is not part of the marker object


Mime
View raw message