asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Luo Chen (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: [ASTERIXDB-2231][STO] Seperate primary op tracker for each p...
Date Sat, 06 Jan 2018 05:43:10 GMT
Luo Chen has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2263

Change subject: [ASTERIXDB-2231][STO] Seperate primary op tracker for each partition
......................................................................

[ASTERIXDB-2231][STO] Seperate primary op tracker for each partition

- user model changes: no
- storage format changes: yes. The local resource has an extra field
partition.
- interface changes: yes. Introduce the notion of partition to Hyracks
storage layer.

Details:
- Seperate primary index operation tracker for each partition, instead
of having a global one on each NC to achieve better scalability.
- As a coordinated change, seperate component id generator for each
partition as well.
- Introduce the notion of partition to Hyracks storage layer such that
the LSM index can get the proper primary op tracker based on its
partition.
- Introduce the notion of partition to transactions such that upon
transaction operations, the proper primary op tracker is operated.

Change-Id: I9eb3854d2343e45beeccb87b0d434e5f4efd69c9
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
M asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
M asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/DatasetLocalResourceFactory.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
M asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
M hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResourceFactory.java
M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilderFactory.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResourceFactory.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResourceFactory.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResourceFactory.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResourceFactory.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResourceFactory.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResourceFactory.java
M hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResourceFactory.java
M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceFactory.java
M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
67 files changed, 471 insertions(+), 361 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/63/2263/1

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java
index 18ef143..1223f0c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java
@@ -75,8 +75,8 @@
     }
 
     @Override
-    public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
-        return asterixAppRuntimeContext.getLSMBTreeOperationTracker(datasetID);
+    public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID, int partition) {
+        return asterixAppRuntimeContext.getLSMBTreeOperationTracker(datasetID, partition);
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 9c53c18..a8efb56 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -69,8 +69,8 @@
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
 import org.apache.asterix.metadata.api.IMetadataNode;
 import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
-import org.apache.asterix.replication.management.ReplicationManager;
 import org.apache.asterix.replication.management.ReplicationChannel;
+import org.apache.asterix.replication.management.ReplicationManager;
 import org.apache.asterix.runtime.transaction.GlobalResourceIdFactoryProvider;
 import org.apache.asterix.runtime.utils.NoOpCoordinationService;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
@@ -111,15 +111,15 @@
     private ILSMMergePolicyFactory metadataMergePolicyFactory;
     private final INCServiceContext ncServiceContext;
     private final IResourceIdFactory resourceIdFactory;
-    private CompilerProperties compilerProperties;
-    private ExternalProperties externalProperties;
-    private MetadataProperties metadataProperties;
-    private StorageProperties storageProperties;
-    private TransactionProperties txnProperties;
-    private ActiveProperties activeProperties;
-    private BuildProperties buildProperties;
-    private ReplicationProperties replicationProperties;
-    private MessagingProperties messagingProperties;
+    private final CompilerProperties compilerProperties;
+    private final ExternalProperties externalProperties;
+    private final MetadataProperties metadataProperties;
+    private final StorageProperties storageProperties;
+    private final TransactionProperties txnProperties;
+    private final ActiveProperties activeProperties;
+    private final BuildProperties buildProperties;
+    private final ReplicationProperties replicationProperties;
+    private final MessagingProperties messagingProperties;
     private final NodeProperties nodeProperties;
     private ExecutorService threadExecutor;
     private IDatasetMemoryManager datasetMemoryManager;
@@ -207,8 +207,8 @@
                         datasetMemoryManager, indexCheckpointManagerProvider, ioManager.getIODevices().size());
         final String nodeId = getServiceContext().getNodeId();
         final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
-        final Set<Integer> nodePartitionsIds = Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId)
-                .collect(Collectors.toSet());
+        final Set<Integer> nodePartitionsIds =
+                Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
         replicaManager = new ReplicaManager(this, nodePartitionsIds);
         isShuttingdown = false;
         activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
@@ -377,8 +377,8 @@
     }
 
     @Override
-    public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
-        return datasetLifecycleManager.getOperationTracker(datasetID);
+    public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID, int partition) {
+        return datasetLifecycleManager.getOperationTracker(datasetID, partition);
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index fca0848..6fca678 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -90,8 +90,8 @@
     private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
     private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
             new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
-    private static final GenerationFunction[] RECORD_GEN_FUNCTION = { GenerationFunction.DETERMINISTIC,
-            GenerationFunction.DETERMINISTIC };
+    private static final GenerationFunction[] RECORD_GEN_FUNCTION =
+            { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
     private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
     private static final ARecordType META_TYPE = null;
     private static final GenerationFunction[] META_GEN_FUNCTION = null;
@@ -102,6 +102,7 @@
     private static final int TOTAL_NUM_OF_RECORDS = 10000;
     private static final int RECORDS_PER_COMPONENT = 1000;
     private static final int DATASET_ID = 101;
+    private static final int PARTITION_ID = 1;
     private static final String DATAVERSE_NAME = "TestDV";
     private static final String DATASET_NAME = "TestDS";
     private static final String DATA_TYPE_NAME = "DUMMY";
@@ -152,14 +153,15 @@
         List<List<String>> partitioningKeys = new ArrayList<>();
         partitioningKeys.add(Collections.singletonList("key"));
         int partition = 0;
-        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
-                NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
-                        partitioningKeys, null, null, null, false, null),
-                null, DatasetType.INTERNAL, DATASET_ID, 0);
+        dataset =
+                new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                        NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                                partitioningKeys, null, null, null, false, null),
+                        null, DatasetType.INTERNAL, DATASET_ID, 0);
         PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
                 storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, partition);
-        IndexDataflowHelperFactory iHelperFactory = new IndexDataflowHelperFactory(nc.getStorageManager(),
-                primaryIndexInfo.getFileSplitProvider());
+        IndexDataflowHelperFactory iHelperFactory =
+                new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
         JobId jobId = nc.newJobId();
         ctx = nc.createTestContext(jobId, partition, false);
         indexDataflowHelper = iHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
@@ -218,7 +220,7 @@
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID, PARTITION_ID).refresh();
             ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             // rollback a memory component
             lsmAccessor.deleteComponents(memoryComponentsPredicate);
@@ -227,7 +229,7 @@
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
 
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID, PARTITION_ID).refresh();
             ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
             lsmAccessor.deleteComponents(pred);
@@ -282,7 +284,7 @@
             searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
 
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID, PARTITION_ID).refresh();
             ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             // rollback a memory component
             lsmAccessor.deleteComponents(memoryComponentsPredicate);
@@ -309,7 +311,7 @@
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
 
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID, PARTITION_ID).refresh();
             ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
             lsmAccessor.deleteComponents(pred);
@@ -359,7 +361,7 @@
             // now that we enetered, we will rollback
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
 
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID, PARTITION_ID).refresh();
             ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             // rollback a memory component
             lsmAccessor.deleteComponents(
@@ -380,7 +382,7 @@
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
 
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID, PARTITION_ID).refresh();
             ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
             DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
             lsmAccessor.deleteComponents(pred);
@@ -789,7 +791,7 @@
                 public void run() {
                     ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
                     try {
-                        dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+                        dsLifecycleMgr.getComponentIdGenerator(DATASET_ID, PARTITION_ID).refresh();
                         ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
                         lsmAccessor.deleteComponents(predicate);
                     } catch (HyracksDataException e) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
index e376ff9..9a528d3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
@@ -33,9 +33,9 @@
 
     private final List<ITestOpCallback<Void>> callbacks = new ArrayList<>();
 
-    public TestPrimaryIndexOperationTracker(int datasetID, ILogManager logManager, DatasetInfo dsInfo,
+    public TestPrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo,
             ILSMComponentIdGenerator idGenerator) {
-        super(datasetID, logManager, dsInfo, idGenerator);
+        super(datasetID, partition, logManager, dsInfo, idGenerator);
     }
 
     public void addCallback(ITestOpCallback<Void> callback) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
index 5d7a7c6..409adb4 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
@@ -20,6 +20,7 @@
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
+import java.util.Map;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.context.DatasetLifecycleManager;
@@ -32,7 +33,7 @@
 public class TestPrimaryIndexOperationTrackerFactory extends PrimaryIndexOperationTrackerFactory {
 
     private static final long serialVersionUID = 1L;
-    private int datasetId;
+    private final int datasetId;
 
     public TestPrimaryIndexOperationTrackerFactory(int datasetId) {
         super(datasetId);
@@ -40,17 +41,18 @@
     }
 
     @Override
-    public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) {
+    public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, int partition) {
         try {
             INcApplicationContext appCtx = (INcApplicationContext) ctx.getApplicationContext();
             DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) appCtx.getDatasetLifecycleManager();
             DatasetResource dsr = dslcManager.getDatasetLifecycle(datasetId);
-            PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
+            PrimaryIndexOperationTracker opTracker = dslcManager.getOperationTracker(datasetId, partition);
             if (!(opTracker instanceof TestPrimaryIndexOperationTracker)) {
-                Field opTrackerField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTracker");
-                opTracker = new TestPrimaryIndexOperationTracker(datasetId,
-                        appCtx.getTransactionSubsystem().getLogManager(), dsr.getDatasetInfo(), dsr.getIdGenerator());
-                setFinal(opTrackerField, dsr, opTracker);
+                Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
+                opTracker = new TestPrimaryIndexOperationTracker(datasetId, partition,
+                        appCtx.getTransactionSubsystem().getLogManager(), dsr.getDatasetInfo(),
+                        dslcManager.getComponentIdGenerator(datasetId, partition));
+                replaceMapEntry(opTrackersField, dsr, partition, opTracker);
             }
             return opTracker;
         } catch (Exception e) {
@@ -65,4 +67,14 @@
         modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
         field.set(obj, newValue);
     }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    static void replaceMapEntry(Field field, Object obj, Object key, Object value)
+            throws Exception, IllegalAccessException {
+        field.setAccessible(true);
+        Field modifiersField = Field.class.getDeclaredField("modifiers");
+        modifiersField.setAccessible(true);
+        Map map = (Map) field.get(obj);
+        map.put(key, value);
+    }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
index decee99..70e5f6e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
@@ -42,6 +42,7 @@
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
 import org.junit.After;
 import org.junit.Assert;
@@ -67,8 +68,8 @@
 
     @Test
     public void abortMetadataTxn() throws Exception {
-        ICcApplicationContext appCtx = (ICcApplicationContext) integrationUtil.getClusterControllerService()
-                .getApplicationContext();
+        ICcApplicationContext appCtx =
+                (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext();
         final MetadataProvider metadataProvider = new MetadataProvider(appCtx, null);
         final MetadataTransactionContext mdTxn = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxn);
@@ -95,8 +96,8 @@
 
     @Test
     public void rebalanceFailureMetadataTxn() throws Exception {
-        ICcApplicationContext appCtx = (ICcApplicationContext) integrationUtil.getClusterControllerService()
-                .getApplicationContext();
+        ICcApplicationContext appCtx =
+                (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext();
         String nodeGroup = "ng";
         String datasetName = "dataset1";
         final TestCaseContext.OutputFormat format = TestCaseContext.OutputFormat.CLEAN_JSON;
@@ -155,8 +156,8 @@
         testExecutor.executeSqlppUpdateOrDdl("CREATE DATASET " + datasetName + "(KeyType) PRIMARY KEY id;", format);
 
         // get created dataset
-        ICcApplicationContext appCtx = (ICcApplicationContext) integrationUtil.getClusterControllerService()
-                .getApplicationContext();
+        ICcApplicationContext appCtx =
+                (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext();
         MetadataProvider metadataProvider = new MetadataProvider(appCtx, null);
         final MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -199,12 +200,14 @@
         Assert.assertEquals(0, failCount.get());
 
         // make sure all metadata indexes have no pending operations after all txns committed/aborted
-        final IDatasetLifecycleManager datasetLifecycleManager = ((INcApplicationContext) integrationUtil.ncs[0]
-                .getApplicationContext()).getDatasetLifecycleManager();
+        final IDatasetLifecycleManager datasetLifecycleManager =
+                ((INcApplicationContext) integrationUtil.ncs[0].getApplicationContext()).getDatasetLifecycleManager();
         int maxMetadatasetId = 14;
         for (int i = 1; i <= maxMetadatasetId; i++) {
-            if (datasetLifecycleManager.getIndex(i, i) != null) {
-                final PrimaryIndexOperationTracker opTracker = datasetLifecycleManager.getOperationTracker(i);
+            ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(i, i);
+            if (index != null) {
+                final PrimaryIndexOperationTracker opTracker =
+                        (PrimaryIndexOperationTracker) index.getOperationTracker();
                 Assert.assertEquals(0, opTracker.getNumActiveOperations());
             }
         }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
index f43f3ff..ea7ce2a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
@@ -199,7 +199,7 @@
         final TransactionOptions options = new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL);
         final ITransactionManager transactionManager = ncAppCtx.getTransactionSubsystem().getTransactionManager();
         final ITransactionContext txnCtx = transactionManager.beginTransaction(txnId, options);
-        txnCtx.register(resourceId, index, NoOpOperationCallback.INSTANCE, true);
+        txnCtx.register(resourceId, 0, index, NoOpOperationCallback.INSTANCE, true);
         return txnCtx;
     }
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml
index 334dd52..cf40930 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/only_sqlpp.xml
@@ -19,5 +19,10 @@
  !-->
 <test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp">
   <test-group name="failed">
+   <test-case FilePath="aggregate">
+      <compilation-unit name="sum_null-with-pred">
+        <output-dir compare="Text">sum_null-with-pred</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 41c5ade..4441c6e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -75,17 +75,19 @@
      * creates (if necessary) and returns the primary index operation tracker of a dataset.
      *
      * @param datasetId
+     * @param partition
      * @return
      */
-    PrimaryIndexOperationTracker getOperationTracker(int datasetId);
+    PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition);
 
     /**
      * creates (if necessary) and returns the component Id generator of a dataset.
      *
      * @param datasetId
+     * @param partition
      * @return
      */
-    ILSMComponentIdGenerator getComponentIdGenerator(int datasetId);
+    ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition);
 
     /**
      * creates (if necessary) and returns the dataset virtual buffer caches.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 8a83c7b..1618533 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -66,7 +66,7 @@
 
     IResourceIdFactory getResourceIdFactory();
 
-    ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
+    ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID, int partition);
 
     void initialize(boolean initialRun) throws IOException, ACIDException, AlgebricksException;
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index e5fc998..74910f3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -23,7 +23,6 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.commons.lang3.tuple.Pair;
@@ -92,10 +91,10 @@
         ILSMComponent leftComponent = immutableComponents.get(mergeableIndexes.getLeft());
         ILSMComponent rightComponent = immutableComponents.get(mergeableIndexes.getRight());
         ILSMComponentId targetId = LSMComponentIdUtils.union(leftComponent.getId(), rightComponent.getId());
-        Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos();
-        int partition = getIndexPartition(index, indexInfos);
-        triggerScheduledMerge(targetId,
-                indexInfos.stream().filter(info -> info.getPartition() == partition).collect(Collectors.toSet()));
+        int partition = ((PrimaryIndexOperationTracker) index.getOperationTracker()).getPartition();
+        Set<ILSMIndex> indexes =
+                datasetLifecycleManager.getDatasetInfo(datasetId).getDatasetPartitionIndexes(partition);
+        triggerScheduledMerge(targetId, indexes);
         return true;
     }
 
@@ -107,11 +106,8 @@
      * @param indexInfos
      * @throws HyracksDataException
      */
-    private void triggerScheduledMerge(ILSMComponentId targetId, Set<IndexInfo> indexInfos)
-            throws HyracksDataException {
-        for (IndexInfo info : indexInfos) {
-            ILSMIndex lsmIndex = info.getIndex();
-
+    private void triggerScheduledMerge(ILSMComponentId targetId, Set<ILSMIndex> indexes) throws HyracksDataException {
+        for (ILSMIndex lsmIndex : indexes) {
             List<ILSMDiskComponent> immutableComponents = new ArrayList<>(lsmIndex.getDiskComponents());
             if (isMergeOngoing(immutableComponents)) {
                 continue;
@@ -131,14 +127,5 @@
             ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents);
         }
-    }
-
-    private int getIndexPartition(ILSMIndex index, Set<IndexInfo> indexInfos) {
-        for (IndexInfo info : indexInfos) {
-            if (info.getIndex() == index) {
-                return info.getPartition();
-            }
-        }
-        return -1;
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 9d63818..bf52601 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -30,6 +30,9 @@
 
 public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
     private static final Logger LOGGER = LogManager.getLogger();
+    // partition -> index
+    private final Map<Integer, Set<IndexInfo>> partitionIndexes;
+    // resourceID -> index
     private final Map<Long, IndexInfo> indexes;
     private final int datasetID;
     private int numActiveIOOps;
@@ -40,6 +43,7 @@
     private boolean durable;
 
     public DatasetInfo(int datasetID) {
+        this.partitionIndexes = new HashMap<>();
         this.indexes = new HashMap<>();
         this.setLastAccess(-1);
         this.datasetID = datasetID;
@@ -69,26 +73,17 @@
         notifyAll();
     }
 
-    public synchronized Set<ILSMIndex> getDatasetIndexes() {
-        Set<ILSMIndex> datasetIndexes = new HashSet<>();
-        for (IndexInfo iInfo : getIndexes().values()) {
-            if (iInfo.isOpen()) {
-                datasetIndexes.add(iInfo.getIndex());
+    public synchronized Set<ILSMIndex> getDatasetPartitionIndexes(int partition) {
+        Set<ILSMIndex> partitionIndexes = new HashSet<>();
+        Set<IndexInfo> partitionIndexInfos = this.partitionIndexes.get(partition);
+        if (partitionIndexInfos != null) {
+            for (IndexInfo iInfo : partitionIndexInfos) {
+                if (iInfo.isOpen()) {
+                    partitionIndexes.add(iInfo.getIndex());
+                }
             }
         }
-
-        return datasetIndexes;
-    }
-
-    public synchronized Set<IndexInfo> getDatsetIndexInfos() {
-        Set<IndexInfo> infos = new HashSet<>();
-        for (IndexInfo iInfo : getIndexes().values()) {
-            if (iInfo.isOpen()) {
-                infos.add(iInfo);
-            }
-        }
-
-        return infos;
+        return partitionIndexes;
     }
 
     @Override
@@ -160,6 +155,18 @@
         return indexes;
     }
 
+    public synchronized void addIndex(long resourceID, IndexInfo indexInfo) {
+        indexes.put(resourceID, indexInfo);
+        partitionIndexes.computeIfAbsent(indexInfo.getPartition(), partition -> new HashSet<>()).add(indexInfo);
+    }
+
+    public synchronized void removeIndex(long resourceID) {
+        IndexInfo info = indexes.remove(resourceID);
+        if (info != null) {
+            partitionIndexes.get(info.getPartition()).remove(info);
+        }
+    }
+
     public boolean isRegistered() {
         return isRegistered;
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
index 7b8397c..4ffaae1 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
@@ -41,10 +41,10 @@
     }
 
     @Override
-    public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) {
+    public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx, int partition) {
         IDatasetLifecycleManager dslcManager =
                 ((INcApplicationContext) serviceCtx.getApplicationContext()).getDatasetLifecycleManager();
-        return dslcManager.getComponentIdGenerator(datasetId);
+        return dslcManager.getComponentIdGenerator(datasetId, partition);
     }
 
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 5d3d125..c0a3ac0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -139,7 +140,7 @@
             throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST);
         }
 
-        PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
+        PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
         if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
             if (LOGGER.isErrorEnabled()) {
                 final String logMsg = String.format(
@@ -155,7 +156,7 @@
         DatasetInfo dsInfo = dsr.getDatasetInfo();
         dsInfo.waitForIO();
         closeIndex(iInfo);
-        dsInfo.getIndexes().remove(resourceID);
+        dsInfo.removeIndex(resourceID);
         if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
                 && !dsInfo.isExternal()) {
             removeDatasetFromCache(dsInfo.getDatasetID());
@@ -203,10 +204,7 @@
         List<DatasetResource> datasetsResources = new ArrayList<>(datasets.values());
         Collections.sort(datasetsResources);
         for (DatasetResource dsr : datasetsResources) {
-            PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
-            if (opTracker != null && opTracker.getNumActiveOperations() == 0
-                    && dsr.getDatasetInfo().getReferenceCount() == 0 && dsr.getDatasetInfo().isOpen()
-                    && !dsr.isMetadataDataset()) {
+            if (isCandidateDatasetForEviction(dsr)) {
                 closeDataset(dsr);
                 LOGGER.info(() -> "Evicted Dataset" + dsr.getDatasetID());
                 return true;
@@ -215,14 +213,17 @@
         return false;
     }
 
-    private static void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) throws HyracksDataException {
-        if (iInfo.isOpen()) {
-            ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback());
+    private boolean isCandidateDatasetForEviction(DatasetResource dsr) {
+        if (dsr.getDatasetInfo().getReferenceCount() != 0 || !dsr.getDatasetInfo().isOpen()
+                || dsr.isMetadataDataset()) {
+            return false;
         }
-
-        // Wait for the above flush op.
-        dsInfo.waitForIO();
+        for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) {
+            if (opTracker.getNumActiveOperations() != 0) {
+                return false;
+            }
+        }
+        return true;
     }
 
     public DatasetResource getDatasetLifecycle(int did) {
@@ -234,12 +235,9 @@
             dsr = datasets.get(did);
             if (dsr == null) {
                 DatasetInfo dsInfo = new DatasetInfo(did);
-                ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator();
-                PrimaryIndexOperationTracker opTracker =
-                        new PrimaryIndexOperationTracker(did, logManager, dsInfo, idGenerator);
                 DatasetVirtualBufferCaches vbcs = new DatasetVirtualBufferCaches(did, storageProperties,
                         memoryManager.getNumPages(did), numPartitions);
-                dsr = new DatasetResource(dsInfo, opTracker, vbcs, idGenerator);
+                dsr = new DatasetResource(dsInfo, vbcs);
                 datasets.put(did, dsr);
             }
             return dsr;
@@ -318,13 +316,33 @@
     }
 
     @Override
-    public PrimaryIndexOperationTracker getOperationTracker(int datasetId) {
-        return datasets.get(datasetId).getOpTracker();
+    public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition) {
+        DatasetResource dataset = datasets.get(datasetId);
+        PrimaryIndexOperationTracker opTracker = dataset.getOpTracker(partition);
+        if (opTracker == null) {
+            populateOpTrackerAndIdGenerator(dataset, partition);
+            opTracker = dataset.getOpTracker(partition);
+        }
+        return opTracker;
     }
 
     @Override
-    public ILSMComponentIdGenerator getComponentIdGenerator(int datasetId) {
-        return datasets.get(datasetId).getIdGenerator();
+    public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition) {
+        DatasetResource dataset = datasets.get(datasetId);
+        ILSMComponentIdGenerator generator = dataset.getComponentIdGenerator(partition);
+        if (generator == null) {
+            populateOpTrackerAndIdGenerator(dataset, partition);
+            generator = dataset.getComponentIdGenerator(partition);
+        }
+        return generator;
+    }
+
+    private void populateOpTrackerAndIdGenerator(DatasetResource dataset, int partition) {
+        ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator();
+        PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(dataset.getDatasetID(), partition,
+                logManager, dataset.getDatasetInfo(), idGenerator);
+        dataset.setPrimaryIndexOperationTracker(partition, opTracker);
+        dataset.setIdGenerator(partition, idGenerator);
     }
 
     private void validateDatasetLifecycleManagerState() throws HyracksDataException {
@@ -357,24 +375,28 @@
     public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws HyracksDataException {
         //schedule flush for datasets with min LSN (Log Serial Number) < targetLSN
         for (DatasetResource dsr : datasets.values()) {
-            PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
-            synchronized (opTracker) {
-                for (IndexInfo iInfo : dsr.getIndexes().values()) {
-                    AbstractLSMIOOperationCallback ioCallback =
-                            (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback();
-                    if (!(iInfo.getIndex().isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()
-                            || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit())) {
-                        long firstLSN = ioCallback.getFirstLSN();
-                        if (firstLSN < targetLSN) {
-                            LOGGER.info("Checkpoint flush dataset {}", dsr.getDatasetID());
-                            opTracker.setFlushOnExit(true);
-                            if (opTracker.getNumActiveOperations() == 0) {
-                                // No Modify operations currently, we need to trigger the flush and we can do so safely
-                                opTracker.flushIfRequested();
+            for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) {
+                // check all partitions
+                int partition = opTracker.getPartition();
+                synchronized (opTracker) {
+                    for (ILSMIndex lsmIndex : dsr.getDatasetInfo().getDatasetPartitionIndexes(partition)) {
+                        AbstractLSMIOOperationCallback ioCallback =
+                                (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback();
+                        if (!(lsmIndex.isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()
+                                || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit())) {
+                            long firstLSN = ioCallback.getFirstLSN();
+                            if (firstLSN < targetLSN) {
+                                LOGGER.info("Checkpoint flush dataset {} partition {}", dsr.getDatasetID(), partition);
+                                opTracker.setFlushOnExit(true);
+                                if (opTracker.getNumActiveOperations() == 0) {
+                                    // No Modify operations currently, we need to trigger the flush and we can do so safely
+                                    opTracker.flushIfRequested();
+                                }
+                                break;
                             }
-                            break;
                         }
                     }
+
                 }
             }
         }
@@ -389,54 +411,60 @@
             // no memory components for external dataset
             return;
         }
-        PrimaryIndexOperationTracker primaryOpTracker = dsr.getOpTracker();
-        if (primaryOpTracker.getNumActiveOperations() > 0) {
-            throw new IllegalStateException(
-                    "flushDatasetOpenIndexes is called on a dataset with currently active operations");
-        }
+        for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) {
+            // flush each partition one by one
+            if (primaryOpTracker.getNumActiveOperations() > 0) {
+                throw new IllegalStateException(
+                        "flushDatasetOpenIndexes is called on a dataset with currently active operations");
+            }
+            int partition = primaryOpTracker.getPartition();
+            Collection<ILSMIndex> indexes = dsInfo.getDatasetPartitionIndexes(partition);
+            ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID(), partition);
+            idGenerator.refresh();
 
-        ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID());
-        idGenerator.refresh();
+            if (dsInfo.isDurable()) {
+                synchronized (logRecord) {
+                    TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), null, logManager.getNodeId(),
+                            indexes.size());
+                    try {
+                        logManager.log(logRecord);
+                    } catch (ACIDException e) {
+                        throw new HyracksDataException("could not write flush log while closing dataset", e);
+                    }
 
-        if (dsInfo.isDurable()) {
-            synchronized (logRecord) {
-                TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), null, logManager.getNodeId(),
-                        dsInfo.getIndexes().size());
-                try {
-                    logManager.log(logRecord);
-                } catch (ACIDException e) {
-                    throw new HyracksDataException("could not write flush log while closing dataset", e);
+                    try {
+                        //notification will come from LogPage class (notifyFlushTerminator)
+                        logRecord.wait();
+                    } catch (InterruptedException e) {
+                        throw new HyracksDataException(e);
+                    }
                 }
+            }
+            for (ILSMIndex index : indexes) {
+                //update resource lsn
+                AbstractLSMIOOperationCallback ioOpCallback =
+                        (AbstractLSMIOOperationCallback) index.getIOOperationCallback();
+                ioOpCallback.updateLastLSN(logRecord.getLSN());
+            }
 
-                try {
-                    //notification will come from LogPage class (notifyFlushTerminator)
-                    logRecord.wait();
-                } catch (InterruptedException e) {
-                    throw new HyracksDataException(e);
+            if (asyncFlush) {
+                for (ILSMIndex index : indexes) {
+                    ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+                    accessor.scheduleFlush(index.getIOOperationCallback());
+                }
+            } else {
+                for (ILSMIndex index : indexes) {
+                    // TODO: This is not efficient since we flush the indexes sequentially.
+                    // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this
+                    // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker.
+                    ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+                    accessor.scheduleFlush(index.getIOOperationCallback());
+                    // Wait for the above flush op.
+                    dsInfo.waitForIO();
                 }
             }
         }
 
-        for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
-            //update resource lsn
-            AbstractLSMIOOperationCallback ioOpCallback =
-                    (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback();
-            ioOpCallback.updateLastLSN(logRecord.getLSN());
-        }
-
-        if (asyncFlush) {
-            for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
-                ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
-                accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback());
-            }
-        } else {
-            for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
-                // TODO: This is not efficient since we flush the indexes sequentially.
-                // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this
-                // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker.
-                flushAndWaitForIO(dsInfo, iInfo);
-            }
-        }
     }
 
     private void closeDataset(DatasetResource dsr) throws HyracksDataException {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index c02de7e..8dcae23 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.common.context;
 
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
@@ -41,17 +43,16 @@
  */
 public class DatasetResource implements Comparable<DatasetResource> {
     private final DatasetInfo datasetInfo;
-    private final PrimaryIndexOperationTracker datasetPrimaryOpTracker;
     private final DatasetVirtualBufferCaches datasetVirtualBufferCaches;
-    private final ILSMComponentIdGenerator datasetComponentIdGenerator;
 
-    public DatasetResource(DatasetInfo datasetInfo, PrimaryIndexOperationTracker datasetPrimaryOpTracker,
-            DatasetVirtualBufferCaches datasetVirtualBufferCaches,
-            ILSMComponentIdGenerator datasetComponentIdGenerator) {
+    private final Map<Integer, PrimaryIndexOperationTracker> datasetPrimaryOpTrackers;
+    private final Map<Integer, ILSMComponentIdGenerator> datasetComponentIdGenerators;
+
+    public DatasetResource(DatasetInfo datasetInfo, DatasetVirtualBufferCaches datasetVirtualBufferCaches) {
         this.datasetInfo = datasetInfo;
-        this.datasetPrimaryOpTracker = datasetPrimaryOpTracker;
         this.datasetVirtualBufferCaches = datasetVirtualBufferCaches;
-        this.datasetComponentIdGenerator = datasetComponentIdGenerator;
+        this.datasetPrimaryOpTrackers = new HashMap<>();
+        this.datasetComponentIdGenerators = new HashMap<>();
     }
 
     public boolean isRegistered() {
@@ -108,7 +109,8 @@
         if (index == null) {
             throw new HyracksDataException("Attempt to register a null index");
         }
-        datasetInfo.getIndexes().put(resourceID, new IndexInfo(index, datasetInfo.getDatasetID(), resource,
+
+        datasetInfo.addIndex(resourceID, new IndexInfo(index, datasetInfo.getDatasetID(), resource,
                 ((DatasetLocalResource) resource.getResource()).getPartition()));
     }
 
@@ -116,12 +118,31 @@
         return datasetInfo;
     }
 
-    public PrimaryIndexOperationTracker getOpTracker() {
-        return datasetPrimaryOpTracker;
+    public PrimaryIndexOperationTracker getOpTracker(int partition) {
+        return datasetPrimaryOpTrackers.get(partition);
     }
 
-    public ILSMComponentIdGenerator getIdGenerator() {
-        return datasetComponentIdGenerator;
+    public Collection<PrimaryIndexOperationTracker> getOpTrackers() {
+        return datasetPrimaryOpTrackers.values();
+    }
+
+    public ILSMComponentIdGenerator getComponentIdGenerator(int partition) {
+        return datasetComponentIdGenerators.get(partition);
+    }
+
+    public void setPrimaryIndexOperationTracker(int partition, PrimaryIndexOperationTracker opTracker) {
+        if (datasetPrimaryOpTrackers.containsKey(partition)) {
+            throw new IllegalStateException(
+                    "PrimaryIndexOperationTracker has already been set for partition " + partition);
+        }
+        datasetPrimaryOpTrackers.put(partition, opTracker);
+    }
+
+    public void setIdGenerator(int partition, ILSMComponentIdGenerator idGenerator) {
+        if (datasetComponentIdGenerators.containsKey(partition)) {
+            throw new IllegalStateException("LSMComponentIdGenerator has already been set for partition " + partition);
+        }
+        datasetComponentIdGenerators.put(partition, idGenerator);
     }
 
     @Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 14e91ba..4ef5adf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -43,6 +43,7 @@
 
 public class PrimaryIndexOperationTracker extends BaseOperationTracker {
 
+    private final int partition;
     // Number of active operations on an ILSMIndex instance.
     private final AtomicInteger numActiveOperations;
     private final ILogManager logManager;
@@ -50,9 +51,10 @@
     private boolean flushOnExit = false;
     private boolean flushLogCreated = false;
 
-    public PrimaryIndexOperationTracker(int datasetID, ILogManager logManager, DatasetInfo dsInfo,
+    public PrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo,
             ILSMComponentIdGenerator idGenerator) {
         super(datasetID, dsInfo);
+        this.partition = partition;
         this.logManager = logManager;
         this.numActiveOperations = new AtomicInteger();
         this.idGenerator = idGenerator;
@@ -100,7 +102,7 @@
         // or if there is a flush scheduled by the checkpoint (flushOnExit), then schedule it
 
         boolean needsFlush = false;
-        Set<ILSMIndex> indexes = dsInfo.getDatasetIndexes();
+        Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionIndexes(partition);
 
         if (!flushOnExit) {
             for (ILSMIndex lsmIndex : indexes) {
@@ -129,8 +131,7 @@
                  * Generate a FLUSH log.
                  * Flush will be triggered when the log is written to disk by LogFlusher.
                  */
-                TransactionUtil.formFlushLogRecord(logRecord, datasetID, this, logManager.getNodeId(),
-                        dsInfo.getDatasetIndexes().size());
+                TransactionUtil.formFlushLogRecord(logRecord, datasetID, this, logManager.getNodeId(), indexes.size());
                 try {
                     logManager.log(logRecord);
                 } catch (ACIDException e) {
@@ -147,7 +148,7 @@
     //This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled.
     public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException {
         idGenerator.refresh();
-        for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) {
+        for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionIndexes(partition)) {
             //get resource
             ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             //update resource lsn
@@ -200,4 +201,8 @@
         return flushLogCreated;
     }
 
+    public int getPartition() {
+        return partition;
+    }
+
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index c625988..bacebf1 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -125,9 +125,9 @@
             if (oldComponents == null) {
                 throw new IllegalStateException("Merge must have old components");
             }
-            LongPointable markerLsn = LongPointable.FACTORY.createPointable(ComponentUtils
-                    .getLong(oldComponents.get(0).getMetadata(), ComponentUtils.MARKER_LSN_KEY,
-                            ComponentUtils.NOT_FOUND));
+            LongPointable markerLsn =
+                    LongPointable.FACTORY.createPointable(ComponentUtils.getLong(oldComponents.get(0).getMetadata(),
+                            ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND));
             newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn);
         } else if (opType == LSMIOOperationType.FLUSH) {
             // advance memory component indexes
@@ -182,8 +182,8 @@
         if (mergedComponents == null || mergedComponents.isEmpty()) {
             return null;
         }
-        return LSMComponentIdUtils
-                .union(mergedComponents.get(0).getId(), mergedComponents.get(mergedComponents.size() - 1).getId());
+        return LSMComponentIdUtils.union(mergedComponents.get(0).getId(),
+                mergedComponents.get(mergedComponents.size() - 1).getId());
 
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
index ed56ab1..a5e2d32 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
@@ -38,17 +38,20 @@
 
     protected transient INCServiceContext ncCtx;
 
+    protected transient int partition;
+
     public AbstractLSMIndexIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) {
         this.idGeneratorFactory = idGeneratorFactory;
     }
 
     @Override
-    public void initialize(INCServiceContext ncCtx) {
+    public void initialize(INCServiceContext ncCtx, int partition) {
         this.ncCtx = ncCtx;
+        this.partition = partition;
     }
 
     protected ILSMComponentIdGenerator getComponentIdGenerator() {
-        return idGeneratorFactory.getComponentIdGenerator(ncCtx);
+        return idGeneratorFactory.getComponentIdGenerator(ncCtx, partition);
     }
 
     protected IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() {
@@ -60,7 +63,7 @@
             private static final long serialVersionUID = 1L;
 
             @Override
-            public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) {
+            public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx, int partition) {
                 // used for backward compatibility
                 // if idGeneratorFactory is not set for legacy lsm indexes, we return a default
                 // component id generator which always generates the missing component id.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java
index 229fb6d..cf07d3b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java
@@ -40,7 +40,7 @@
 
     double getBloomFilterFalsePositiveRate();
 
-    ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
+    ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID, int partition);
 
     ILSMIOOperationScheduler getLSMIOScheduler();
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
index c4a2d03..a3d5bc5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
@@ -38,11 +38,13 @@
      * transaction.
      *
      * @param resourceId
+     * @param partition
      * @param index
      * @param callback
      * @param primaryIndex
      */
-    void register(long resourceId, ILSMIndex index, IModificationOperationCallback callback, boolean primaryIndex);
+    void register(long resourceId, int partition, ILSMIndex index, IModificationOperationCallback callback,
+            boolean primaryIndex);
 
     /**
      * Gets the unique transaction id.
@@ -135,8 +137,10 @@
      * Called to notify the transaction that an entity commit
      * log belonging to this transaction has been flushed to
      * disk.
+     *
+     * @param partition
      */
-    void notifyEntityCommitted();
+    void notifyEntityCommitted(int partition);
 
     /**
      * Called after an operation is performed on index
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
index 3aa7b17..f9f742a 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
@@ -22,15 +22,14 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.context.CorrelatedPrefixMergePolicy;
 import org.apache.asterix.common.context.DatasetInfo;
 import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
@@ -59,6 +58,8 @@
     private final int MAX_COMPONENT_COUNT = 3;
 
     private final int DATASET_ID = 1;
+
+    private long nextResourceId = 0;
 
     @Test
     public void testBasic() {
@@ -183,19 +184,15 @@
         }
     }
 
-    private ILSMMergePolicy mockMergePolicy(IndexInfo... indexes) {
+    private ILSMMergePolicy mockMergePolicy(IndexInfo... indexInfos) {
         Map<String, String> properties = new HashMap<>();
         properties.put("max-tolerance-component-count", String.valueOf(MAX_COMPONENT_COUNT));
         properties.put("max-mergable-component-size", String.valueOf(MAX_COMPONENT_SIZE));
 
-        Set<IndexInfo> indexInfos = new HashSet<>();
-        for (IndexInfo info : indexes) {
-            indexInfos.add(info);
+        DatasetInfo dsInfo = new DatasetInfo(DATASET_ID);
+        for (IndexInfo index : indexInfos) {
+            dsInfo.addIndex(index.getResourceId(), index);
         }
-
-        DatasetInfo dsInfo = Mockito.mock(DatasetInfo.class);
-        Mockito.when(dsInfo.getDatsetIndexInfos()).thenReturn(indexInfos);
-
         IDatasetLifecycleManager manager = Mockito.mock(IDatasetLifecycleManager.class);
         Mockito.when(manager.getDatasetInfo(DATASET_ID)).thenReturn(dsInfo);
 
@@ -238,8 +235,16 @@
 
         Mockito.when(index.createAccessor(Mockito.any(IIndexAccessParameters.class))).thenReturn(accessor);
         Mockito.when(index.isPrimaryIndex()).thenReturn(isPrimary);
+        if (isPrimary) {
+            PrimaryIndexOperationTracker opTracker = Mockito.mock(PrimaryIndexOperationTracker.class);
+            Mockito.when(opTracker.getPartition()).thenReturn(partition);
+            Mockito.when(index.getOperationTracker()).thenReturn(opTracker);
+        }
         final LocalResource localResource = Mockito.mock(LocalResource.class);
-        return new IndexInfo(index, DATASET_ID, localResource, partition);
+        Mockito.when(localResource.getId()).thenReturn(nextResourceId++);
+        IndexInfo indexInfo = new IndexInfo(index, DATASET_ID, localResource, partition);
+        indexInfo.setOpen(true);
+        return indexInfo;
     }
 
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index ed3bbe0..9e942bc 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -42,6 +42,7 @@
 import org.apache.asterix.common.transactions.ImmutableDatasetId;
 import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.metadata.api.ExtensionMetadataDataset;
@@ -478,7 +479,9 @@
             IIndexAccessParameters iap = new IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE);
             ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(iap);
             txnCtx.setWriteTxn(true);
-            txnCtx.register(metadataIndex.getResourceId(), lsmIndex, modCallback, metadataIndex.isPrimaryIndex());
+            txnCtx.register(metadataIndex.getResourceId(),
+                    StoragePathUtil.getPartitionNumFromRelativePath(resourceName), lsmIndex, modCallback,
+                    metadataIndex.isPrimaryIndex());
             LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, transactionSubsystem.getLogManager());
             switch (op) {
                 case INSERT:
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 3205cb6..a9f4162 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -63,7 +63,6 @@
 import org.apache.asterix.metadata.utils.MetadataUtil;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory;
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerFactory;
 import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
@@ -110,14 +109,14 @@
     private static String metadataNodeName;
     private static List<String> nodeNames;
     private static boolean isNewUniverse;
-    private static final IMetadataIndex[] PRIMARY_INDEXES = new IMetadataIndex[] {
-            MetadataPrimaryIndexes.DATAVERSE_DATASET, MetadataPrimaryIndexes.DATASET_DATASET,
-            MetadataPrimaryIndexes.DATATYPE_DATASET, MetadataPrimaryIndexes.INDEX_DATASET,
-            MetadataPrimaryIndexes.NODE_DATASET, MetadataPrimaryIndexes.NODEGROUP_DATASET,
-            MetadataPrimaryIndexes.FUNCTION_DATASET, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET,
-            MetadataPrimaryIndexes.FEED_DATASET, MetadataPrimaryIndexes.FEED_POLICY_DATASET,
-            MetadataPrimaryIndexes.LIBRARY_DATASET, MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET,
-            MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET };
+    private static final IMetadataIndex[] PRIMARY_INDEXES =
+            new IMetadataIndex[] { MetadataPrimaryIndexes.DATAVERSE_DATASET, MetadataPrimaryIndexes.DATASET_DATASET,
+                    MetadataPrimaryIndexes.DATATYPE_DATASET, MetadataPrimaryIndexes.INDEX_DATASET,
+                    MetadataPrimaryIndexes.NODE_DATASET, MetadataPrimaryIndexes.NODEGROUP_DATASET,
+                    MetadataPrimaryIndexes.FUNCTION_DATASET, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET,
+                    MetadataPrimaryIndexes.FEED_DATASET, MetadataPrimaryIndexes.FEED_POLICY_DATASET,
+                    MetadataPrimaryIndexes.LIBRARY_DATASET, MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET,
+                    MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET };
 
     private MetadataBootstrap() {
     }
@@ -267,9 +266,9 @@
 
     private static void insertInitialCompactionPolicies(MetadataTransactionContext mdTxnCtx)
             throws AlgebricksException {
-        String[] builtInCompactionPolicyClassNames = new String[] { ConstantMergePolicyFactory.class.getName(),
-                PrefixMergePolicyFactory.class.getName(), NoMergePolicyFactory.class.getName(),
-                CorrelatedPrefixMergePolicyFactory.class.getName() };
+        String[] builtInCompactionPolicyClassNames =
+                new String[] { ConstantMergePolicyFactory.class.getName(), PrefixMergePolicyFactory.class.getName(),
+                        NoMergePolicyFactory.class.getName(), CorrelatedPrefixMergePolicyFactory.class.getName() };
         for (String policyClassName : builtInCompactionPolicyClassNames) {
             CompactionPolicy compactionPolicy = getCompactionPolicyEntity(policyClassName);
             MetadataManager.INSTANCE.addCompactionPolicy(mdTxnCtx, compactionPolicy);
@@ -289,8 +288,8 @@
     private static CompactionPolicy getCompactionPolicyEntity(String compactionPolicyClassName)
             throws AlgebricksException {
         try {
-            String policyName = ((ILSMMergePolicyFactory) (Class.forName(compactionPolicyClassName).newInstance()))
-                    .getName();
+            String policyName =
+                    ((ILSMMergePolicyFactory) (Class.forName(compactionPolicyClassName).newInstance())).getName();
             return new CompactionPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, policyName,
                     compactionPolicyClassName);
         } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
@@ -327,13 +326,13 @@
         // We are unable to do this since IStorageManager needs a dataset to determine
         // the appropriate
         // objects
-        ILSMOperationTrackerFactory opTrackerFactory = index.isPrimaryIndex()
-                ? new PrimaryIndexOperationTrackerFactory(datasetId)
-                : new SecondaryIndexOperationTrackerFactory(datasetId);
-        ILSMComponentIdGeneratorFactory idGeneratorProvider = new DatasetLSMComponentIdGeneratorFactory(
-                index.getDatasetId().getId());
-        ILSMIOOperationCallbackFactory ioOpCallbackFactory = new LSMBTreeIOOperationCallbackFactory(
-                idGeneratorProvider);
+        ILSMOperationTrackerFactory opTrackerFactory =
+                index.isPrimaryIndex() ? new PrimaryIndexOperationTrackerFactory(datasetId)
+                        : new SecondaryIndexOperationTrackerFactory(datasetId);
+        ILSMComponentIdGeneratorFactory idGeneratorProvider =
+                new DatasetLSMComponentIdGeneratorFactory(index.getDatasetId().getId());
+        ILSMIOOperationCallbackFactory ioOpCallbackFactory =
+                new LSMBTreeIOOperationCallbackFactory(idGeneratorProvider);
         IStorageComponentProvider storageComponentProvider = appContext.getStorageComponentProvider();
         if (isNewUniverse()) {
             LSMBTreeLocalResourceFactory lsmBtreeFactory = new LSMBTreeLocalResourceFactory(
@@ -343,13 +342,13 @@
                     storageComponentProvider.getIoOperationSchedulerProvider(),
                     appContext.getMetadataMergePolicyFactory(), GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, true,
                     bloomFilterKeyFields, appContext.getBloomFilterFalsePositiveRate(), true, null);
-            DatasetLocalResourceFactory dsLocalResourceFactory = new DatasetLocalResourceFactory(datasetId,
-                    lsmBtreeFactory);
+            DatasetLocalResourceFactory dsLocalResourceFactory =
+                    new DatasetLocalResourceFactory(datasetId, lsmBtreeFactory);
             // TODO(amoudi) Creating the index should be done through the same code path as
             // other indexes
             // This is to be done by having a metadata dataset associated with each index
             IIndexBuilder indexBuilder = new IndexBuilder(ncServiceCtx, storageComponentProvider.getStorageManager(),
-                    index::getResourceId, file, dsLocalResourceFactory, true);
+                    index::getResourceId, file, dsLocalResourceFactory, true, metadataPartition.getPartitionId());
             indexBuilder.build();
         } else {
             final LocalResource resource = localResourceRepository.get(file.getRelativePath());
@@ -364,8 +363,8 @@
             if (index.getResourceId() != resource.getId()) {
                 throw new HyracksDataException("Resource Id doesn't match expected metadata index resource id");
             }
-            IndexDataflowHelper indexHelper = new IndexDataflowHelper(ncServiceCtx,
-                    storageComponentProvider.getStorageManager(), file);
+            IndexDataflowHelper indexHelper =
+                    new IndexDataflowHelper(ncServiceCtx, storageComponentProvider.getStorageManager(), file);
             indexHelper.open(); // Opening the index through the helper will ensure it gets instantiated
             indexHelper.close();
         }
@@ -420,8 +419,8 @@
                 LOGGER.info("Dropped a pending dataverse: " + dataverse.getDataverseName());
             }
         } else {
-            List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
-                    dataverse.getDataverseName());
+            List<Dataset> datasets =
+                    MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverse.getDataverseName());
             for (Dataset dataset : datasets) {
                 recoverDataset(mdTxnCtx, dataset);
             }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
index 0de61ff..6faffc7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -18,13 +18,12 @@
  */
 package org.apache.asterix.runtime.job.listener;
 
-import static org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
-
 import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
 import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksJobletContext;
@@ -60,9 +59,8 @@
 
     @Override
     public void updateListenerJobParameters(JobParameterByteStore jobParameterByteStore) {
-        String AsterixTransactionIdString =
-                new String(jobParameterByteStore.getParameterValue(TRANSACTION_ID_PARAMETER_NAME, 0,
-                        TRANSACTION_ID_PARAMETER_NAME.length));
+        String AsterixTransactionIdString = new String(jobParameterByteStore
+                .getParameterValue(TRANSACTION_ID_PARAMETER_NAME, 0, TRANSACTION_ID_PARAMETER_NAME.length));
         if (AsterixTransactionIdString.length() > 0) {
             this.txnId = new TxnId(Integer.parseInt(AsterixTransactionIdString));
         }
@@ -75,8 +73,9 @@
             @Override
             public void jobletFinish(JobStatus jobStatus) {
                 try {
-                    ITransactionManager txnManager = ((INcApplicationContext) jobletContext.getServiceContext()
-                            .getApplicationContext()).getTransactionSubsystem().getTransactionManager();
+                    ITransactionManager txnManager =
+                            ((INcApplicationContext) jobletContext.getServiceContext().getApplicationContext())
+                                    .getTransactionSubsystem().getTransactionManager();
                     ITransactionContext txnContext = txnManager.getTransactionContext(txnId);
                     txnContext.setWriteTxn(transactionalWrite);
                     if (jobStatus != JobStatus.FAILURE) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 97fd7ce..ef2cca1 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -73,7 +73,7 @@
             IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
                     resource.getId(), aResource.getPartition(), resourceType, indexOp, operatorNodePushable);
-            txnCtx.register(resource.getId(), index, modCallback, true);
+            txnCtx.register(resource.getId(), aResource.getPartition(), index, modCallback, true);
             return modCallback;
         } catch (ACIDException e) {
             throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
index f40140a..180d6fc 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
@@ -36,10 +36,10 @@
     }
 
     @Override
-    public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) {
+    public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, int partition) {
         IDatasetLifecycleManager dslcManager =
                 ((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager();
-        return dslcManager.getOperationTracker(datasetId);
+        return dslcManager.getOperationTracker(datasetId, partition);
     }
 
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 0c20ee9..75b2d37 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -69,7 +69,7 @@
             IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
                     resource.getId(), aResource.getPartition(), resourceType, indexOp);
-            txnCtx.register(resource.getId(), index, modCallback, false);
+            txnCtx.register(resource.getId(), aResource.getPartition(), index, modCallback, false);
             return modCallback;
         } catch (ACIDException e) {
             throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
index febcac2..97b4b35 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.transaction.management.opcallbacks;
 
-import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.context.BaseOperationTracker;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -36,7 +36,7 @@
     }
 
     @Override
-    public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) {
+    public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, int partition) {
         IDatasetLifecycleManager dslcManager =
                 ((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager();
         return new BaseOperationTracker(datasetID, dslcManager.getDatasetInfo(datasetID));
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index c2f512f..e7b633d 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -68,7 +68,7 @@
             IModificationOperationCallback modCallback = new UpsertOperationCallback(new DatasetId(datasetId),
                     primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(),
                     aResource.getPartition(), resourceType, indexOp);
-            txnCtx.register(resource.getId(), index, modCallback, true);
+            txnCtx.register(resource.getId(), aResource.getPartition(), index, modCallback, true);
             return modCallback;
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/DatasetLocalResourceFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/DatasetLocalResourceFactory.java
index 8724128..9f10404 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/DatasetLocalResourceFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/DatasetLocalResourceFactory.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.transaction.management.resource;
 
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
-import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.storage.common.IResource;
 import org.apache.hyracks.storage.common.IResourceFactory;
@@ -36,10 +35,8 @@
     }
 
     @Override
-    public IResource createResource(FileReference fileRef) {
-        IResource resource = resourceFactory.createResource(fileRef);
-        // Currently, we get the partition number from the relative path
-        int partition = StoragePathUtil.getPartitionNumFromRelativePath(fileRef.getRelativePath());
+    public IResource createResource(FileReference fileRef, int partition) {
+        IResource resource = resourceFactory.createResource(fileRef, partition);
         return new DatasetLocalResource(datasetId, partition, resource);
     }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 011d2a1..3d7ff3f 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -181,7 +181,7 @@
                         }
                         endOffset = appendOffset;
                     }
-                internalFlush(flushOffset, endOffset);
+                    internalFlush(flushOffset, endOffset);
                 } catch (InterruptedException e) {
                     interrupted = true;
                 }
@@ -237,7 +237,7 @@
                         txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId);
                         txnSubsystem.getLockManager().unlock(reusableDatasetId, logRecord.getPKHashValue(),
                                 LockMode.ANY, txnCtx);
-                        txnCtx.notifyEntityCommitted();
+                        txnCtx.notifyEntityCommitted(logRecord.getResourcePartition());
                         if (txnSubsystem.getTransactionProperties().isCommitProfilerEnabled()) {
                             txnSubsystem.incrementEntityCommitCount();
                         }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
index 43fe266..b3d5e49 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
@@ -118,7 +118,7 @@
     }
 
     @Override
-    public void register(long resourceId, ILSMIndex index, IModificationOperationCallback callback,
+    public void register(long resourceId, int partition, ILSMIndex index, IModificationOperationCallback callback,
             boolean primaryIndex) {
         synchronized (txnOpTrackers) {
             if (!txnOpTrackers.containsKey(resourceId)) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
index 1d132a8..219cf07 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
@@ -44,9 +44,9 @@
     }
 
     @Override
-    public void register(long resourceId, ILSMIndex index, IModificationOperationCallback callback,
+    public void register(long resourceId, int partition, ILSMIndex index, IModificationOperationCallback callback,
             boolean primaryIndex) {
-        super.register(resourceId, index, callback, primaryIndex);
+        super.register(resourceId, partition, index, callback, primaryIndex);
         synchronized (txnOpTrackers) {
             if (primaryIndex && !opTrackers.containsKey(resourceId)) {
                 opTrackers.put(resourceId, index.getOperationTracker());
@@ -67,7 +67,7 @@
     }
 
     @Override
-    public void notifyEntityCommitted() {
+    public void notifyEntityCommitted(int partition) {
         throw new IllegalStateException("Unexpected entity commit in atomic transaction");
     }
 
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
index e195451..2a0740c 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
@@ -18,11 +18,15 @@
  */
 package org.apache.asterix.transaction.management.service.transaction;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -32,30 +36,36 @@
 @ThreadSafe
 public class EntityLevelTransactionContext extends AbstractTransactionContext {
 
-    private PrimaryIndexOperationTracker primaryIndexOpTracker;
-    private IModificationOperationCallback primaryIndexCallback;
-    private final AtomicInteger pendingOps;
+    private final Map<Integer, Pair<PrimaryIndexOperationTracker, IModificationOperationCallback>> primaryIndexTrackers;
+    private final Map<Long, AtomicInteger> resourcePendingOps;
+    private final Map<Integer, AtomicInteger> partitionPendingOps;
 
     public EntityLevelTransactionContext(TxnId txnId) {
         super(txnId);
-        pendingOps = new AtomicInteger(0);
+        this.primaryIndexTrackers = new HashMap<>();
+        this.resourcePendingOps = new HashMap<>();
+        this.partitionPendingOps = new HashMap<>();
     }
 
     @Override
-    public void register(long resourceId, ILSMIndex index, IModificationOperationCallback callback,
+    public void register(long resourceId, int partition, ILSMIndex index, IModificationOperationCallback callback,
             boolean primaryIndex) {
-        super.register(resourceId, index, callback, primaryIndex);
+        super.register(resourceId, partition, index, callback, primaryIndex);
+        AtomicInteger pendingOps = partitionPendingOps.computeIfAbsent(partition, p -> new AtomicInteger(0));
+        resourcePendingOps.put(resourceId, pendingOps);
         synchronized (txnOpTrackers) {
-            if (primaryIndex && primaryIndexOpTracker == null) {
-                primaryIndexCallback = callback;
-                primaryIndexOpTracker = (PrimaryIndexOperationTracker) index.getOperationTracker();
+            if (primaryIndex) {
+                Pair<PrimaryIndexOperationTracker, IModificationOperationCallback> pair =
+                        new Pair<PrimaryIndexOperationTracker, IModificationOperationCallback>(
+                                (PrimaryIndexOperationTracker) index.getOperationTracker(), callback);
+                primaryIndexTrackers.put(partition, pair);
             }
         }
     }
 
     @Override
     public void beforeOperation(long resourceId) {
-        pendingOps.incrementAndGet();
+        resourcePendingOps.get(resourceId).incrementAndGet();
     }
 
     @Override
@@ -64,9 +74,11 @@
     }
 
     @Override
-    public void notifyEntityCommitted() {
+    public void notifyEntityCommitted(int partition) {
         try {
-            primaryIndexOpTracker.completeOperation(null, LSMOperationType.MODIFICATION, null, primaryIndexCallback);
+            Pair<PrimaryIndexOperationTracker, IModificationOperationCallback> pair =
+                    primaryIndexTrackers.get(partition);
+            pair.first.completeOperation(null, LSMOperationType.MODIFICATION, null, pair.second);
         } catch (HyracksDataException e) {
             throw new ACIDException(e);
         }
@@ -74,13 +86,14 @@
 
     @Override
     public void afterOperation(long resourceId) {
-        pendingOps.decrementAndGet();
+        resourcePendingOps.get(resourceId).decrementAndGet();
     }
 
     @Override
     protected void cleanupForAbort() {
-        if (primaryIndexOpTracker != null) {
-            primaryIndexOpTracker.cleanupNumActiveOperationsForAbortedJob(pendingOps.get());
+        for(Entry<Integer, Pair<PrimaryIndexOperationTracker, IModificationOperationCallback>>e:primaryIndexTrackers.entrySet()) {
+            AtomicInteger pendingOps = partitionPendingOps.get(e.getKey());
+            e.getValue().first.cleanupNumActiveOperationsForAbortedJob(pendingOps.get());
         }
     }
 
diff --git a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
index f897aca..600fbf8 100644
--- a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
+++ b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.transaction.management.service.locking;
 
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.*;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -64,7 +64,7 @@
     }
 
     @Override
-    public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
+    public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID, int partition) {
         throw new UnsupportedOperationException();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResourceFactory.java
index 7a21495..18a9c98 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResourceFactory.java
@@ -43,7 +43,7 @@
     }
 
     @Override
-    public IResource createResource(FileReference fileRef) {
+    public IResource createResource(FileReference fileRef, int partition) {
         return new BTreeResource(fileRef.getRelativePath(), storageManager, typeTraits, comparatorFactories,
                 pageManagerFactory);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
index f62860a..c1222b2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
@@ -46,6 +46,7 @@
     protected final FileReference resourceRef;
     protected final IResourceFactory localResourceFactory;
     protected final boolean durable;
+    protected final int partition;
     private final IResourceIdFactory resourceIdFactory;
 
     /*
@@ -54,13 +55,14 @@
      * with specific resource ids. See MetadataBootstrap
      */
     public IndexBuilder(INCServiceContext ctx, IStorageManager storageManager, IResourceIdFactory resourceIdFactory,
-            FileReference resourceRef, IResourceFactory localResourceFactory, boolean durable)
+            FileReference resourceRef, IResourceFactory localResourceFactory, boolean durable, int partition)
             throws HyracksDataException {
         this.ctx = ctx;
         this.storageManager = storageManager;
         this.resourceIdFactory = resourceIdFactory;
         this.localResourceFactory = localResourceFactory;
         this.durable = durable;
+        this.partition = partition;
         this.resourceRef = resourceRef;
     }
 
@@ -78,7 +80,7 @@
                 localResourceRepository.delete(resourceRef.getRelativePath());
             }
             resourceId = resourceIdFactory.createId();
-            IResource resource = localResourceFactory.createResource(resourceRef);
+            IResource resource = localResourceFactory.createResource(resourceRef, partition);
             lr = new LocalResource(resourceId, ITreeIndexFrame.Constants.VERSION, durable, resource);
             IIndex index = lcManager.get(resourceRef.getRelativePath());
             if (index != null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilderFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilderFactory.java
index 0f79788..4221671 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilderFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilderFactory.java
@@ -48,6 +48,6 @@
         FileReference resourceRef = fileSplitProvider.getFileSplits()[partition].getFileReference(ctx.getIoManager());
         return new IndexBuilder(ctx.getJobletContext().getServiceContext(), storageManager,
                 storageManager.getResourceIdFactory(ctx.getJobletContext().getServiceContext()), resourceRef,
-                localResourceFactory, durable);
+                localResourceFactory, durable, partition);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
index 673bd3b..777769d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
@@ -46,22 +46,23 @@
             IBinaryComparatorFactory[] filterCmpFactories, int[] btreeFields, int[] filterFields,
             ILSMOperationTrackerFactory opTrackerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
             IMetadataPageManagerFactory metadataPageManagerFactory,
-            ILSMIOOperationSchedulerProvider ioSchedulerProvider, boolean durable) {
+            ILSMIOOperationSchedulerProvider ioSchedulerProvider, boolean durable, int partition) {
         super(typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, path,
                 storageManager, mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories,
                 btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, null,
-                ioSchedulerProvider, durable);
+                ioSchedulerProvider, durable, partition);
     }
 
     @Override
     public ILSMIndex createInstance(INCServiceContext serviceCtx) throws HyracksDataException {
         IIOManager ioManager = serviceCtx.getIoManager();
         FileReference file = ioManager.resolve(path);
-        ioOpCallbackFactory.initialize(serviceCtx);
+        ioOpCallbackFactory.initialize(serviceCtx, partition);
         return LSMBTreeUtil.createExternalBTree(ioManager, file, storageManager.getBufferCache(serviceCtx), typeTraits,
                 cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
                 mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
-                opTrackerProvider.getOperationTracker(serviceCtx), ioSchedulerProvider.getIoScheduler(serviceCtx),
-                ioOpCallbackFactory, durable, metadataPageManagerFactory, serviceCtx.getTracer());
+                opTrackerProvider.getOperationTracker(serviceCtx, partition),
+                ioSchedulerProvider.getIoScheduler(serviceCtx), ioOpCallbackFactory, durable,
+                metadataPageManagerFactory, serviceCtx.getTracer());
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResourceFactory.java
index a4c24c9..e19e505 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResourceFactory.java
@@ -50,10 +50,11 @@
     }
 
     @Override
-    public LsmResource createResource(FileReference fileRef) {
+    public LsmResource createResource(FileReference fileRef, int partition) {
         return new ExternalBTreeLocalResource(typeTraits, cmpFactories, bloomFilterKeyFields,
                 bloomFilterFalsePositiveRate, isPrimary, fileRef.getRelativePath(), storageManager, mergePolicyFactory,
                 mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, bloomFilterKeyFields,
-                opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, ioSchedulerProvider, durable);
+                opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, ioSchedulerProvider, durable,
+                partition);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
index 7e44c63..91da804 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
@@ -49,22 +49,23 @@
             IBinaryComparatorFactory[] filterCmpFactories, int[] btreeFields, int[] filterFields,
             ILSMOperationTrackerFactory opTrackerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
             IMetadataPageManagerFactory metadataPageManagerFactory,
-            ILSMIOOperationSchedulerProvider ioSchedulerProvider, boolean durable) {
+            ILSMIOOperationSchedulerProvider ioSchedulerProvider, boolean durable, int partition) {
         super(typeTraits, cmpFactories, buddyBtreeFields, bloomFilterFalsePositiveRate, isPrimary, path, storageManager,
                 mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields,
                 filterFields, opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, null,
-                ioSchedulerProvider, durable);
+                ioSchedulerProvider, durable, partition);
     }
 
     @Override
     public ILSMIndex createInstance(INCServiceContext serviceCtx) throws HyracksDataException {
         IIOManager ioManager = serviceCtx.getIoManager();
         FileReference file = ioManager.resolve(path);
-        ioOpCallbackFactory.initialize(serviceCtx);
+        ioOpCallbackFactory.initialize(serviceCtx, partition);
         return LSMBTreeUtil.createExternalBTreeWithBuddy(ioManager, file, storageManager.getBufferCache(serviceCtx),
                 typeTraits, cmpFactories, bloomFilterFalsePositiveRate,
                 mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
-                opTrackerProvider.getOperationTracker(serviceCtx), ioSchedulerProvider.getIoScheduler(serviceCtx),
-                ioOpCallbackFactory, bloomFilterKeyFields, durable, metadataPageManagerFactory, serviceCtx.getTracer());
+                opTrackerProvider.getOperationTracker(serviceCtx, partition),
+                ioSchedulerProvider.getIoScheduler(serviceCtx), ioOpCallbackFactory, bloomFilterKeyFields, durable,
+                metadataPageManagerFactory, serviceCtx.getTracer());
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResourceFactory.java
index 2aff61a..89a72e6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResourceFactory.java
@@ -50,10 +50,11 @@
     }
 
     @Override
-    public LsmResource createResource(FileReference fileRef) {
+    public LsmResource createResource(FileReference fileRef, int partition) {
         return new ExternalBTreeWithBuddyLocalResource(typeTraits, cmpFactories, bloomFilterKeyFields,
                 bloomFilterFalsePositiveRate, isPrimary, fileRef.getRelativePath(), storageManager, mergePolicyFactory,
                 mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, filterFields,
-                opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, ioSchedulerProvider, durable);
+                opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, ioSchedulerProvider, durable,
+                partition);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
index 1988736..911250f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
@@ -55,10 +55,10 @@
             IBinaryComparatorFactory[] filterCmpFactories, int[] btreeFields, int[] filterFields,
             ILSMOperationTrackerFactory opTrackerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
             IMetadataPageManagerFactory metadataPageManagerFactory, IVirtualBufferCacheProvider vbcProvider,
-            ILSMIOOperationSchedulerProvider ioSchedulerProvider, boolean durable) {
+            ILSMIOOperationSchedulerProvider ioSchedulerProvider, boolean durable, int partition) {
         super(path, storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
                 opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
-                mergePolicyFactory, mergePolicyProperties, durable);
+                mergePolicyFactory, mergePolicyProperties, durable, partition);
         this.bloomFilterKeyFields = bloomFilterKeyFields;
         this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
         this.isPrimary = isPrimary;
@@ -70,14 +70,15 @@
         IIOManager ioManager = serviceCtx.getIoManager();
         FileReference file = ioManager.resolve(path);
         List<IVirtualBufferCache> vbcs = vbcProvider.getVirtualBufferCaches(serviceCtx, file);
-        ioOpCallbackFactory.initialize(serviceCtx);
+        ioOpCallbackFactory.initialize(serviceCtx, partition);
         //TODO: enable updateAwareness for secondary LSMBTree indexes
         boolean updateAware = false;
         return LSMBTreeUtil.createLSMTree(ioManager, vbcs, file, storageManager.getBufferCache(serviceCtx), typeTraits,
                 cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
                 mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
-                opTrackerProvider.getOperationTracker(serviceCtx), ioSchedulerProvider.getIoScheduler(serviceCtx),
-                ioOpCallbackFactory, isPrimary, filterTypeTraits, filterCmpFactories, btreeFields, filterFields,
-                durable, metadataPageManagerFactory, updateAware, serviceCtx.getTracer());
+                opTrackerProvider.getOperationTracker(serviceCtx, partition),
+                ioSchedulerProvider.getIoScheduler(serviceCtx), ioOpCallbackFactory, isPrimary, filterTypeTraits,
+                filterCmpFactories, btreeFields, filterFields, durable, metadataPageManagerFactory, updateAware,
+                serviceCtx.getTracer());
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
index 5fae5b9..8770d6c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
@@ -59,10 +59,10 @@
     }
 
     @Override
-    public LsmResource createResource(FileReference fileRef) {
+    public LsmResource createResource(FileReference fileRef, int partition) {
         return new LSMBTreeLocalResource(typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
                 isPrimary, fileRef.getRelativePath(), storageManager, mergePolicyFactory, mergePolicyProperties,
                 filterTypeTraits, filterCmpFactories, btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory,
-                metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, durable);
+                metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, durable, partition);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java
index c0f530b..4095183 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java
@@ -24,6 +24,5 @@
 
 @FunctionalInterface
 public interface ILSMComponentIdGeneratorFactory extends Serializable {
-
-    ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx);
+    ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx, int partition);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
index a9dc50e..a3d7acc 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
@@ -24,11 +24,11 @@
 
 public interface ILSMIOOperationCallbackFactory extends Serializable {
     /**
-     * Initialize the callback factory with the given ncCtx
+     * Initialize the callback factory with the given ncCtx and partition
      *
      * @param ncCtx
      */
-    void initialize(INCServiceContext ncCtx);
+    void initialize(INCServiceContext ncCtx, int partition);
 
     ILSMIOOperationCallback createIoOpCallback(ILSMIndex index);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java
index 217f794..820b416 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java
@@ -24,5 +24,5 @@
 
 @FunctionalInterface
 public interface ILSMOperationTrackerFactory extends Serializable {
-    ILSMOperationTracker getOperationTracker(INCServiceContext ctx);
+    ILSMOperationTracker getOperationTracker(INCServiceContext ctx, int partition);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java
index b541750..8c8d901 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java
@@ -55,6 +55,7 @@
     protected final ILSMMergePolicyFactory mergePolicyFactory;
     protected final Map<String, String> mergePolicyProperties;
     protected final boolean durable;
+    protected final int partition;
 
     public LsmResource(String path, IStorageManager storageManager, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] cmpFactories, ITypeTraits[] filterTypeTraits,
@@ -62,7 +63,7 @@
             ILSMOperationTrackerFactory opTrackerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
             IMetadataPageManagerFactory metadataPageManagerFactory, IVirtualBufferCacheProvider vbcProvider,
             ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMMergePolicyFactory mergePolicyFactory,
-            Map<String, String> mergePolicyProperties, boolean durable) {
+            Map<String, String> mergePolicyProperties, boolean durable, int partition) {
         this.path = path;
         this.storageManager = storageManager;
         this.typeTraits = typeTraits;
@@ -78,6 +79,7 @@
         this.mergePolicyFactory = mergePolicyFactory;
         this.mergePolicyProperties = mergePolicyProperties;
         this.durable = durable;
+        this.partition = partition;
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java
index 728c90a..cc53c09 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java
@@ -32,7 +32,7 @@
     private static final long serialVersionUID = 1L;
 
     @Override
-    public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) {
+    public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx, int partition) {
         return new LSMComponentIdGenerator();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
index 3a58c19..c3d96ac 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
@@ -39,7 +39,7 @@
     }
 
     @Override
-    public void initialize(INCServiceContext ncCtx) {
+    public void initialize(INCServiceContext ncCtx, int partition) {
         // No op
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java
index 55a2164..d615a07 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java
@@ -43,7 +43,7 @@
     }
 
     @Override
-    public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) {
+    public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, int partition) {
         return tracker;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java
index d01e7ba..2e9819f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java
@@ -32,7 +32,7 @@
     }
 
     @Override
-    public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) {
+    public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, int partition) {
         return new ThreadCountingTracker();
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java
index a45f006..15a0e23 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java
@@ -65,10 +65,10 @@
             Map<String, String> mergePolicyProperties, boolean durable, ITypeTraits[] tokenTypeTraits,
             IBinaryComparatorFactory[] tokenCmpFactories, IBinaryTokenizerFactory tokenizerFactory,
             boolean isPartitioned, int[] invertedIndexFields, int[] filterFieldsForNonBulkLoadOps,
-            int[] invertedIndexFieldsForNonBulkLoadOps, double bloomFilterFalsePositiveRate) {
+            int[] invertedIndexFieldsForNonBulkLoadOps, double bloomFilterFalsePositiveRate, int partition) {
         super(path, storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
                 opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
-                mergePolicyFactory, mergePolicyProperties, durable);
+                mergePolicyFactory, mergePolicyProperties, durable, partition);
         this.tokenTypeTraits = tokenTypeTraits;
         this.tokenCmpFactories = tokenCmpFactories;
         this.tokenizerFactory = tokenizerFactory;
@@ -87,21 +87,22 @@
         IBufferCache bufferCache = storageManager.getBufferCache(serviceCtx);
         ILSMMergePolicy mergePolicy = mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx);
         ILSMIOOperationScheduler ioScheduler = ioSchedulerProvider.getIoScheduler(serviceCtx);
-        ioOpCallbackFactory.initialize(serviceCtx);
+        ioOpCallbackFactory.initialize(serviceCtx, partition);
         if (isPartitioned) {
             return InvertedIndexUtils.createPartitionedLSMInvertedIndex(ioManager, virtualBufferCaches, typeTraits,
                     cmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory, bufferCache,
                     file.getAbsolutePath(), bloomFilterFalsePositiveRate, mergePolicy,
-                    opTrackerProvider.getOperationTracker(serviceCtx), ioScheduler, ioOpCallbackFactory,
+                    opTrackerProvider.getOperationTracker(serviceCtx, partition), ioScheduler, ioOpCallbackFactory,
                     invertedIndexFields, filterTypeTraits, filterCmpFactories, filterFields,
                     filterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps, durable,
                     metadataPageManagerFactory);
         } else {
             return InvertedIndexUtils.createLSMInvertedIndex(ioManager, virtualBufferCaches, typeTraits, cmpFactories,
                     tokenTypeTraits, tokenCmpFactories, tokenizerFactory, bufferCache, file.getAbsolutePath(),
-                    bloomFilterFalsePositiveRate, mergePolicy, opTrackerProvider.getOperationTracker(serviceCtx),
-                    ioScheduler, ioOpCallbackFactory, invertedIndexFields, filterTypeTraits, filterCmpFactories,
-                    filterFields, filterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps, durable,
+                    bloomFilterFalsePositiveRate, mergePolicy,
+                    opTrackerProvider.getOperationTracker(serviceCtx, partition), ioScheduler, ioOpCallbackFactory,
+                    invertedIndexFields, filterTypeTraits, filterCmpFactories, filterFields,
+                    filterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps, durable,
                     metadataPageManagerFactory);
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResourceFactory.java
index 50fedb5..10c9e27 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResourceFactory.java
@@ -70,12 +70,13 @@
     }
 
     @Override
-    public IResource createResource(FileReference fileRef) {
+    public IResource createResource(FileReference fileRef, int partition) {
         return new LSMInvertedIndexLocalResource(fileRef.getRelativePath(), storageManager, typeTraits, cmpFactories,
                 filterTypeTraits, filterCmpFactories, filterFields, opTrackerProvider, ioOpCallbackFactory,
                 metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties,
                 durable, tokenTypeTraits, tokenCmpFactories, tokenizerFactory, isPartitioned, invertedIndexFields,
-                filterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps, bloomFilterFalsePositiveRate);
+                filterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps, bloomFilterFalsePositiveRate,
+                partition);
     }
 
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java
index 9960590..27bb04d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java
@@ -54,25 +54,25 @@
             Map<String, String> mergePolicyProperties, boolean durable, IBinaryComparatorFactory[] btreeCmpFactories,
             IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
             ILinearizeComparatorFactory linearizeCmpFactory, int[] rtreeFields, int[] buddyBTreeFields,
-            boolean isPointMBR, double bloomFilterFalsePositiveRate) {
+            boolean isPointMBR, double bloomFilterFalsePositiveRate, int partition) {
         super(path, storageManager, typeTraits, rtreeCmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
                 opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, null, ioSchedulerProvider,
                 mergePolicyFactory, mergePolicyProperties, durable, btreeCmpFactories, valueProviderFactories,
                 rtreePolicyType, linearizeCmpFactory, rtreeFields, buddyBTreeFields, isPointMBR,
-                bloomFilterFalsePositiveRate);
+                bloomFilterFalsePositiveRate, partition);
     }
 
     @Override
     public IIndex createInstance(INCServiceContext ncServiceCtx) throws HyracksDataException {
         IIOManager ioManager = ncServiceCtx.getIoManager();
         FileReference fileRef = ioManager.resolve(path);
-        ioOpCallbackFactory.initialize(ncServiceCtx);
+        ioOpCallbackFactory.initialize(ncServiceCtx, partition);
         return LSMRTreeUtils.createExternalRTree(ioManager, fileRef, storageManager.getBufferCache(ncServiceCtx),
                 typeTraits, cmpFactories, btreeCmpFactories, valueProviderFactories, rtreePolicyType,
                 bloomFilterFalsePositiveRate, mergePolicyFactory.createMergePolicy(mergePolicyProperties, ncServiceCtx),
-                opTrackerProvider.getOperationTracker(ncServiceCtx), ioSchedulerProvider.getIoScheduler(ncServiceCtx),
-                ioOpCallbackFactory, linearizeCmpFactory, buddyBTreeFields, durable, isPointMBR,
-                metadataPageManagerFactory, ncServiceCtx.getTracer());
+                opTrackerProvider.getOperationTracker(ncServiceCtx, partition),
+                ioSchedulerProvider.getIoScheduler(ncServiceCtx), ioOpCallbackFactory, linearizeCmpFactory,
+                buddyBTreeFields, durable, isPointMBR, metadataPageManagerFactory, ncServiceCtx.getTracer());
 
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResourceFactory.java
index a5a7cd8..eff3c6a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResourceFactory.java
@@ -56,12 +56,12 @@
     }
 
     @Override
-    public LsmResource createResource(FileReference fileRef) {
+    public LsmResource createResource(FileReference fileRef, int partition) {
         return new ExternalRTreeLocalResource(fileRef.getRelativePath(), storageManager, typeTraits, cmpFactories,
                 filterTypeTraits, filterCmpFactories, filterFields, opTrackerProvider, ioOpCallbackFactory,
                 metadataPageManagerFactory, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, durable,
                 btreeCmpFactories, valueProviderFactories, rtreePolicyType, linearizeCmpFactory, rtreeFields,
-                buddyBTreeFields, isPointMBR, bloomFilterFalsePositiveRate);
+                buddyBTreeFields, isPointMBR, bloomFilterFalsePositiveRate, partition);
     }
 
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java
index f6396cf..126a780 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java
@@ -64,10 +64,10 @@
             Map<String, String> mergePolicyProperties, boolean durable, IBinaryComparatorFactory[] btreeCmpFactories,
             IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
             ILinearizeComparatorFactory linearizeCmpFactory, int[] rtreeFields, int[] buddyBTreeFields,
-            boolean isPointMBR, double bloomFilterFalsePositiveRate) {
+            boolean isPointMBR, double bloomFilterFalsePositiveRate, int partition) {
         super(path, storageManager, typeTraits, rtreeCmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
                 opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
-                mergePolicyFactory, mergePolicyProperties, durable);
+                mergePolicyFactory, mergePolicyProperties, durable, partition);
         this.btreeCmpFactories = btreeCmpFactories;
         this.valueProviderFactories = valueProviderFactories;
         this.rtreePolicyType = rtreePolicyType;
@@ -83,13 +83,14 @@
         IIOManager ioManager = ncServiceCtx.getIoManager();
         FileReference fileRef = ioManager.resolve(path);
         List<IVirtualBufferCache> virtualBufferCaches = vbcProvider.getVirtualBufferCaches(ncServiceCtx, fileRef);
-        ioOpCallbackFactory.initialize(ncServiceCtx);
+        ioOpCallbackFactory.initialize(ncServiceCtx, partition);
         return LSMRTreeUtils.createLSMTree(ioManager, virtualBufferCaches, fileRef,
                 storageManager.getBufferCache(ncServiceCtx), typeTraits, cmpFactories, btreeCmpFactories,
                 valueProviderFactories, rtreePolicyType, bloomFilterFalsePositiveRate,
                 mergePolicyFactory.createMergePolicy(mergePolicyProperties, ncServiceCtx),
-                opTrackerProvider.getOperationTracker(ncServiceCtx), ioSchedulerProvider.getIoScheduler(ncServiceCtx),
-                ioOpCallbackFactory, linearizeCmpFactory, rtreeFields, buddyBTreeFields, filterTypeTraits,
-                filterCmpFactories, filterFields, durable, isPointMBR, metadataPageManagerFactory);
+                opTrackerProvider.getOperationTracker(ncServiceCtx, partition),
+                ioSchedulerProvider.getIoScheduler(ncServiceCtx), ioOpCallbackFactory, linearizeCmpFactory, rtreeFields,
+                buddyBTreeFields, filterTypeTraits, filterCmpFactories, filterFields, durable, isPointMBR,
+                metadataPageManagerFactory);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResourceFactory.java
index 8d27caa..51fcc3a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResourceFactory.java
@@ -73,11 +73,11 @@
     }
 
     @Override
-    public IResource createResource(FileReference fileRef) {
+    public IResource createResource(FileReference fileRef, int partition) {
         return new LSMRTreeLocalResource(fileRef.getRelativePath(), storageManager, typeTraits, cmpFactories,
                 filterTypeTraits, filterCmpFactories, filterFields, opTrackerProvider, ioOpCallbackFactory,
                 metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties,
                 durable, btreeCmpFactories, valueProviderFactories, rtreePolicyType, linearizeCmpFactory, rtreeFields,
-                buddyBTreeFields, isPointMBR, bloomFilterFalsePositiveRate);
+                buddyBTreeFields, isPointMBR, bloomFilterFalsePositiveRate, partition);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java
index f586d51..ffcbb58 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java
@@ -61,10 +61,11 @@
             ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMMergePolicyFactory mergePolicyFactory,
             Map<String, String> mergePolicyProperties, IBinaryComparatorFactory[] btreeComparatorFactories,
             IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
-            ILinearizeComparatorFactory linearizeCmpFactory, int[] rtreeFields, boolean isPointMBR, boolean durable) {
+            ILinearizeComparatorFactory linearizeCmpFactory, int[] rtreeFields, boolean isPointMBR, boolean durable,
+            int partition) {
         super(path, storageManager, typeTraits, rtreeCmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
                 opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
-                mergePolicyFactory, mergePolicyProperties, durable);
+                mergePolicyFactory, mergePolicyProperties, durable, partition);
         this.btreeComparatorFactories = btreeComparatorFactories;
         this.valueProviderFactories = valueProviderFactories;
         this.rtreePolicyType = rtreePolicyType;
@@ -78,13 +79,13 @@
         IIOManager ioManager = serviceCtx.getIoManager();
         FileReference file = ioManager.resolve(path);
         List<IVirtualBufferCache> virtualBufferCaches = vbcProvider.getVirtualBufferCaches(serviceCtx, file);
-        ioOpCallbackFactory.initialize(serviceCtx);
+        ioOpCallbackFactory.initialize(serviceCtx, partition);
         return LSMRTreeUtils.createLSMTreeWithAntiMatterTuples(ioManager, virtualBufferCaches, file,
                 storageManager.getBufferCache(serviceCtx), typeTraits, cmpFactories, btreeComparatorFactories,
                 valueProviderFactories, rtreePolicyType,
                 mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
-                opTrackerProvider.getOperationTracker(serviceCtx), ioSchedulerProvider.getIoScheduler(serviceCtx),
-                ioOpCallbackFactory, linearizeCmpFactory, rtreeFields, filterTypeTraits, filterCmpFactories,
-                filterFields, durable, isPointMBR, metadataPageManagerFactory);
+                opTrackerProvider.getOperationTracker(serviceCtx, partition),
+                ioSchedulerProvider.getIoScheduler(serviceCtx), ioOpCallbackFactory, linearizeCmpFactory, rtreeFields,
+                filterTypeTraits, filterCmpFactories, filterFields, durable, isPointMBR, metadataPageManagerFactory);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResourceFactory.java
index e633c2a..116b10b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResourceFactory.java
@@ -68,11 +68,11 @@
     }
 
     @Override
-    public LsmResource createResource(FileReference fileRef) {
+    public LsmResource createResource(FileReference fileRef, int partition) {
         return new LSMRTreeWithAntiMatterLocalResource(fileRef.getRelativePath(), storageManager, typeTraits,
                 cmpFactories, filterTypeTraits, filterCmpFactories, filterFields, opTrackerProvider,
                 ioOpCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, mergePolicyFactory,
                 mergePolicyProperties, btreeComparatorFactories, valueProviderFactories, rtreePolicyType,
-                linearizeCmpFactory, rtreeFields, isPointMBR, durable);
+                linearizeCmpFactory, rtreeFields, isPointMBR, durable, partition);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResourceFactory.java
index 1a3bb0b..d50e2ec 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResourceFactory.java
@@ -50,7 +50,7 @@
     }
 
     @Override
-    public IResource createResource(FileReference fileRef) {
+    public IResource createResource(FileReference fileRef, int partition) {
         return new RTreeResource(fileRef.getRelativePath(), storageManager, typeTraits, comparatorFactories,
                 pageManagerFactory, valueProviderFactories, rtreePolicyType);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceFactory.java
index e25201f..e546d60 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceFactory.java
@@ -29,6 +29,8 @@
      *
      * @param fileRef
      *            the file reference for the resource
+     * @param partition
+     *            the partition for the resource
      */
-    IResource createResource(FileReference fileRef);
+    IResource createResource(FileReference fileRef, int partition);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
index 1af6779..d9eeeb1 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
@@ -49,7 +49,7 @@
                 harness.getFileReference(), harness.getDiskBufferCache(), SerdeUtils.serdesToTypeTraits(keySerdes),
                 SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length), bloomFilterKeyFields,
                 harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
-                NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null), harness.getIOScheduler(),
+                NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null, 0), harness.getIOScheduler(),
                 harness.getIOOperationCallbackFactory(), true, null, null, null, null, true,
                 harness.getMetadataPageManagerFactory(), false, ITracer.NONE);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
index 0904806..9213fdb 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
@@ -59,7 +59,7 @@
                 harness.getFileReference(), harness.getDiskBufferCache(), SerdeUtils.serdesToTypeTraits(keySerdes),
                 SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length), bloomFilterKeyFields,
                 harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
-                NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null), harness.getIOScheduler(),
+                NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null, 0), harness.getIOScheduler(),
                 harness.getIOOperationCallbackFactory(), true, null, null, null, null, true,
                 harness.getMetadataPageManagerFactory(), false, ITracer.NONE);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
index ccbaa9c..933d992 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
@@ -71,7 +71,7 @@
                 harness.getFileReference(), harness.getDiskBufferCache(), SerdeUtils.serdesToTypeTraits(keySerdes),
                 SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length), bloomFilterKeyFields,
                 harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
-                NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null), harness.getIOScheduler(),
+                NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null, 0), harness.getIOScheduler(),
                 harness.getIOOperationCallbackFactory(), true, null, null, null, null, true,
                 harness.getMetadataPageManagerFactory(), true, ITracer.NONE);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
index 9b53120..a727d10 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
@@ -48,11 +48,11 @@
             IBinaryComparatorFactory[] filterCmpFactories, int[] btreeFields, int[] filterFields,
             ILSMOperationTrackerFactory opTrackerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
             IMetadataPageManagerFactory metadataPageManagerFactory, IVirtualBufferCacheProvider vbcProvider,
-            ILSMIOOperationSchedulerProvider ioSchedulerProvider, boolean durable) {
+            ILSMIOOperationSchedulerProvider ioSchedulerProvider, boolean durable, int partition) {
         super(typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, path,
                 storageManager, mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories,
                 btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory,
-                vbcProvider, ioSchedulerProvider, durable);
+                vbcProvider, ioSchedulerProvider, durable, partition);
     }
 
     @Override
@@ -67,12 +67,13 @@
                 vbcs.add(i, new TestVirtualBufferCache(vbc));
             }
         }
-        ioOpCallbackFactory.initialize(serviceCtx);
+        ioOpCallbackFactory.initialize(serviceCtx, partition);
         return TestLsmBtreeUtil.createLSMTree(ioManager, vbcs, file, storageManager.getBufferCache(serviceCtx),
                 typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
                 mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
-                opTrackerProvider.getOperationTracker(serviceCtx), ioSchedulerProvider.getIoScheduler(serviceCtx),
-                ioOpCallbackFactory, isPrimary, filterTypeTraits, filterCmpFactories, btreeFields, filterFields,
-                durable, metadataPageManagerFactory, false, serviceCtx.getTracer());
+                opTrackerProvider.getOperationTracker(serviceCtx, partition),
+                ioSchedulerProvider.getIoScheduler(serviceCtx), ioOpCallbackFactory, isPrimary, filterTypeTraits,
+                filterCmpFactories, btreeFields, filterFields, durable, metadataPageManagerFactory, false,
+                serviceCtx.getTracer());
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
index 6b13f56..e17020d 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
@@ -51,11 +51,11 @@
     }
 
     @Override
-    public LsmResource createResource(FileReference fileRef) {
+    public LsmResource createResource(FileReference fileRef, int partition) {
         return new TestLsmBtreeLocalResource(typeTraits, cmpFactories, bloomFilterKeyFields,
                 bloomFilterFalsePositiveRate, isPrimary, fileRef.getRelativePath(), storageManager, mergePolicyFactory,
                 mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, filterFields,
                 opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
-                durable);
+                durable, partition);
     }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2263
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I9eb3854d2343e45beeccb87b0d434e5f4efd69c9
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Luo Chen <cluo8@uci.edu>

Mime
View raw message