asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject [3/3] asterixdb git commit: [NO ISSUE][STO] Introduce Index Checkpoints
Date Thu, 07 Dec 2017 23:09:27 GMT
[NO ISSUE][STO] Introduce Index Checkpoints

- user model changes: no
- storage format changes: yes
  - Add index checkpoints.
  - Use index checkpoint to determine low watermark
    during recovery.
- interface changes: yes
  - Introduce IIndexCheckpointManager for managing
    indexes checkpoints.
  - Introduce IIndexCheckpointProvider for tracking
    IIndexCheckpointManager references.

Details:
- Unify LSM flush/merge operations completion order.
- Introduce index checkpoints which contains:
   - Index low watermark.
   - Latest valid LSM component
   - Mapping between master replica and local replica.
- Use index checkpoints instead of LSM component metadata
  for identifying low watermark in recovery.
- Use index checkpoints in replication instead of overwriting
  LSN byte offset in replica component metadata.
- Replace LSN_MAP used in replication by index checkpoints.
- Replace NIO Files.find by Commons FileUtils.listFiles to
  avoid no NoSuchFileException on any file deletion.

Change-Id: Ib22800002bf8ea3660242e599b3f5f20678301a8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2200
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/929344e9
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/929344e9
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/929344e9

Branch: refs/heads/master
Commit: 929344e93049ea133d07fea4d7ff5c9c8e8ed46c
Parents: 3180d87
Author: Murtadha Hubail <mhubail@apache.org>
Authored: Thu Dec 7 23:25:41 2017 +0300
Committer: Murtadha Hubail <mhubail@apache.org>
Committed: Thu Dec 7 15:08:42 2017 -0800

----------------------------------------------------------------------
 .../asterix/app/nc/IndexCheckpointManager.java  | 192 +++++++++++++++++++
 .../app/nc/IndexCheckpointManagerProvider.java  |  66 +++++++
 .../asterix/app/nc/NCAppRuntimeContext.java     |  21 +-
 .../apache/asterix/app/nc/RecoveryManager.java  |  14 +-
 .../TestLsmBtreeIoOpCallbackFactory.java        |  12 +-
 asterixdb/asterix-common/pom.xml                |  12 --
 .../common/api/INcApplicationContext.java       |   4 +
 .../common/context/BaseOperationTracker.java    |   7 +-
 .../common/context/DatasetLifecycleManager.java |  33 ++--
 .../asterix/common/context/DatasetResource.java |   2 +-
 .../asterix/common/context/IndexInfo.java       |  11 +-
 .../context/PrimaryIndexOperationTracker.java   |   8 +-
 .../asterix/common/dataflow/LSMIndexUtil.java   |   7 -
 .../AbstractLSMIOOperationCallback.java         |  97 ++++++----
 ...tractLSMIndexIOOperationCallbackFactory.java |   6 +
 .../LSMBTreeIOOperationCallback.java            |  22 +--
 .../LSMBTreeIOOperationCallbackFactory.java     |   2 +-
 .../LSMBTreeWithBuddyIOOperationCallback.java   |  22 +--
 ...TreeWithBuddyIOOperationCallbackFactory.java |   3 +-
 .../LSMInvertedIndexIOOperationCallback.java    |  23 +--
 ...InvertedIndexIOOperationCallbackFactory.java |   3 +-
 .../LSMRTreeIOOperationCallback.java            |  22 +--
 .../LSMRTreeIOOperationCallbackFactory.java     |   2 +-
 .../storage/DatasetResourceReference.java       |  28 +++
 .../common/storage/IIndexCheckpointManager.java |  93 +++++++++
 .../IIndexCheckpointManagerProvider.java        |  40 ++++
 .../asterix/common/storage/IndexCheckpoint.java |  98 ++++++++++
 .../common/storage/ResourceReference.java       |  22 +++
 .../asterix/common/utils/StorageConstants.java  |   7 +
 .../CorrelatedPrefixMergePolicyTest.java        |   5 +-
 .../AbstractLSMIOOperationCallbackTest.java     |  72 ++++---
 .../LSMBTreeIOOperationCallbackTest.java        |   6 +-
 ...SMBTreeWithBuddyIOOperationCallbackTest.java |   6 +-
 ...LSMInvertedIndexIOOperationCallbackTest.java |   6 +-
 .../LSMRTreeIOOperationCallbackTest.java        |   6 +-
 .../installer/test/AsterixLifecycleIT.java      | 105 ----------
 .../replication/logging/RemoteLogMapping.java   |   4 -
 .../management/ReplicationChannel.java          | 188 +++++++++---------
 .../management/ReplicationManager.java          |  49 ++---
 .../storage/LSMComponentLSNSyncTask.java        |  16 +-
 .../storage/LSMComponentProperties.java         |   5 +-
 .../storage/LSMIndexFileProperties.java         |  21 +-
 .../storage/ReplicaResourcesManager.java        | 118 +++---------
 .../PersistentLocalResourceRepository.java      |  80 +++++---
 ...ersistentLocalResourceRepositoryFactory.java |   8 +-
 .../management/service/logging/LogBuffer.java   |  31 +--
 .../impls/AbstractLSMIndexFileManager.java      |   4 +
 .../storage/am/lsm/common/impls/LSMHarness.java |  92 +++++----
 48 files changed, 1043 insertions(+), 658 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
new file mode 100644
index 0000000..446d04d
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IndexCheckpoint;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public class IndexCheckpointManager implements IIndexCheckpointManager {
+
+    private static final Logger LOGGER = Logger.getLogger(IndexCheckpointManager.class.getName());
+    private static final int HISTORY_CHECKPOINTS = 1;
+    private static final int MAX_CHECKPOINT_WRITE_ATTEMPTS = 5;
+    private static final FilenameFilter CHECKPOINT_FILE_FILTER =
+            (file, name) -> name.startsWith(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX);
+    private static final long BULKLOAD_LSN = 0;
+    private final Path indexPath;
+
+    public IndexCheckpointManager(Path indexPath) {
+        this.indexPath = indexPath;
+    }
+
+    @Override
+    public synchronized void init(long lsn) throws HyracksDataException {
+        final List<IndexCheckpoint> checkpoints = getCheckpoints();
+        if (!checkpoints.isEmpty()) {
+            LOGGER.warning(() -> "Checkpoints found on initializing: " + indexPath);
+            delete();
+        }
+        IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(lsn);
+        persist(firstCheckpoint);
+    }
+
+    @Override
+    public synchronized void replicated(String componentTimestamp, long masterLsn) throws HyracksDataException {
+        final Long localLsn = getLatest().getMasterNodeFlushMap().get(masterLsn);
+        if (localLsn == null) {
+            throw new IllegalStateException("Component flushed before lsn mapping was received");
+        }
+        flushed(componentTimestamp, localLsn);
+    }
+
+    @Override
+    public synchronized void flushed(String componentTimestamp, long lsn) throws HyracksDataException {
+        final IndexCheckpoint latest = getLatest();
+        IndexCheckpoint nextCheckpoint = IndexCheckpoint.next(latest, lsn, componentTimestamp);
+        persist(nextCheckpoint);
+        deleteHistory(nextCheckpoint.getId(), HISTORY_CHECKPOINTS);
+    }
+
+    @Override
+    public synchronized void masterFlush(long masterLsn, long localLsn) throws HyracksDataException {
+        final IndexCheckpoint latest = getLatest();
+        latest.getMasterNodeFlushMap().put(masterLsn, localLsn);
+        final IndexCheckpoint next =
+                IndexCheckpoint.next(latest, latest.getLowWatermark(), latest.getValidComponentTimestamp());
+        persist(next);
+        notifyAll();
+    }
+
+    @Override
+    public synchronized long getLowWatermark() throws HyracksDataException {
+        return getLatest().getLowWatermark();
+    }
+
+    @Override
+    public synchronized boolean isFlushed(long masterLsn) throws HyracksDataException {
+        if (masterLsn == BULKLOAD_LSN) {
+            return true;
+        }
+        return getLatest().getMasterNodeFlushMap().containsKey(masterLsn);
+    }
+
+    @Override
+    public synchronized void advanceLowWatermark(long lsn) throws HyracksDataException {
+        flushed(getLatest().getValidComponentTimestamp(), lsn);
+    }
+
+    @Override
+    public synchronized void delete() {
+        deleteHistory(Long.MAX_VALUE, 0);
+    }
+
+    private IndexCheckpoint getLatest() {
+        final List<IndexCheckpoint> checkpoints = getCheckpoints();
+        if (checkpoints.isEmpty()) {
+            throw new IllegalStateException("Couldn't find any checkpoints for resource: " + indexPath);
+        }
+        checkpoints.sort(Comparator.comparingLong(IndexCheckpoint::getId).reversed());
+        return checkpoints.get(0);
+    }
+
+    private List<IndexCheckpoint> getCheckpoints() {
+        List<IndexCheckpoint> checkpoints = new ArrayList<>();
+        final File[] checkpointFiles = indexPath.toFile().listFiles(CHECKPOINT_FILE_FILTER);
+        if (checkpointFiles != null) {
+            for (File checkpointFile : checkpointFiles) {
+                try {
+                    checkpoints.add(read(checkpointFile.toPath()));
+                } catch (IOException e) {
+                    LOGGER.log(Level.WARNING, e, () -> "Couldn't read index checkpoint file: " + e);
+                }
+            }
+        }
+        return checkpoints;
+    }
+
+    private void persist(IndexCheckpoint checkpoint) throws HyracksDataException {
+        final Path checkpointPath = getCheckpointPath(checkpoint);
+        for (int i = 1; i <= MAX_CHECKPOINT_WRITE_ATTEMPTS; i++) {
+            try {
+                // clean up from previous write failure
+                if (checkpointPath.toFile().exists()) {
+                    Files.delete(checkpointPath);
+                }
+                try (BufferedWriter writer = Files.newBufferedWriter(checkpointPath)) {
+                    writer.write(checkpoint.asJson());
+                }
+                // ensure it was written correctly by reading it
+                read(checkpointPath);
+            } catch (IOException e) {
+                if (i == MAX_CHECKPOINT_WRITE_ATTEMPTS) {
+                    throw HyracksDataException.create(e);
+                }
+                LOGGER.log(Level.WARNING, e, () -> "Filed to write checkpoint at: " + indexPath);
+                int nextAttempt = i + 1;
+                LOGGER.info(() -> "Checkpoint write attempt " + nextAttempt + "/" + MAX_CHECKPOINT_WRITE_ATTEMPTS);
+            }
+        }
+    }
+
+    private IndexCheckpoint read(Path checkpointPath) throws IOException {
+        return IndexCheckpoint.fromJson(new String(Files.readAllBytes(checkpointPath)));
+    }
+
+    private void deleteHistory(long latestId, int historyToKeep) {
+        try {
+            final File[] checkpointFiles = indexPath.toFile().listFiles(CHECKPOINT_FILE_FILTER);
+            if (checkpointFiles != null) {
+                for (File checkpointFile : checkpointFiles) {
+                    if (getCheckpointIdFromFileName(checkpointFile.toPath()) < (latestId - historyToKeep)) {
+                        Files.delete(checkpointFile.toPath());
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.log(Level.WARNING, e, () -> "Couldn't delete history checkpoints at " + indexPath);
+        }
+    }
+
+    private Path getCheckpointPath(IndexCheckpoint checkpoint) {
+        return Paths.get(indexPath.toString(),
+                StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX + String.valueOf(checkpoint.getId()));
+    }
+
+    private long getCheckpointIdFromFileName(Path checkpointPath) {
+        return Long.valueOf(checkpointPath.getFileName().toString()
+                .substring(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX.length()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
new file mode 100644
index 0000000..19ad8f6
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc;
+
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+
+public class IndexCheckpointManagerProvider implements IIndexCheckpointManagerProvider {
+
+    private final Map<ResourceReference, IndexCheckpointManager> indexCheckpointManagerMap = new HashMap<>();
+    private final IIOManager ioManager;
+
+    public IndexCheckpointManagerProvider(IIOManager ioManager) {
+        this.ioManager = ioManager;
+    }
+
+    @Override
+    public IIndexCheckpointManager get(ResourceReference ref) throws HyracksDataException {
+        synchronized (indexCheckpointManagerMap) {
+            return indexCheckpointManagerMap.computeIfAbsent(ref, this::create);
+        }
+    }
+
+    @Override
+    public void close(ResourceReference ref) {
+        synchronized (indexCheckpointManagerMap) {
+            indexCheckpointManagerMap.remove(ref);
+        }
+    }
+
+    private IndexCheckpointManager create(ResourceReference ref) {
+        try {
+            final Path indexPath = getIndexPath(ref);
+            return new IndexCheckpointManager(indexPath);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    private Path getIndexPath(ResourceReference indexRef) throws HyracksDataException {
+        return ioManager.resolve(indexRef.getRelativePath().toString()).getFile().toPath();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 5cae2d6..b6bf2df 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -60,6 +60,7 @@ import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.replication.IReplicationChannel;
 import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.IStorageSubsystem;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.IRecoveryManager;
@@ -142,6 +143,7 @@ public class NCAppRuntimeContext implements INcApplicationContext {
     private final IStorageComponentProvider componentProvider;
     private IHyracksClientConnection hcc;
     private IStorageSubsystem storageSubsystem;
+    private IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
 
     public NCAppRuntimeContext(INCServiceContext ncServiceContext, List<AsterixExtension> extensions)
             throws AsterixException, InstantiationException, IllegalAccessException, ClassNotFoundException,
@@ -182,11 +184,11 @@ public class NCAppRuntimeContext implements INcApplicationContext {
         lsmIOScheduler = AsynchronousScheduler.INSTANCE;
 
         metadataMergePolicyFactory = new PrefixMergePolicyFactory();
+        indexCheckpointManagerProvider = new IndexCheckpointManagerProvider(ioManager);
 
         ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory =
                 new PersistentLocalResourceRepositoryFactory(ioManager, getServiceContext().getNodeId(),
-                        metadataProperties);
-
+                        metadataProperties, indexCheckpointManagerProvider);
         localResourceRepository =
                 (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory.createRepository();
 
@@ -203,11 +205,10 @@ public class NCAppRuntimeContext implements INcApplicationContext {
             }
             localResourceRepository.deleteStorageData();
         }
-
         datasetMemoryManager = new DatasetMemoryManager(storageProperties);
         datasetLifecycleManager =
                 new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
-                        datasetMemoryManager, ioManager.getIODevices().size());
+                        datasetMemoryManager, indexCheckpointManagerProvider, ioManager.getIODevices().size());
         final String nodeId = getServiceContext().getNodeId();
         final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
         final Set<Integer> nodePartitionsIds = Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId)
@@ -220,7 +221,8 @@ public class NCAppRuntimeContext implements INcApplicationContext {
 
         if (replicationProperties.isParticipant(getServiceContext().getNodeId())) {
 
-            replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties);
+            replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties,
+                    indexCheckpointManagerProvider);
 
             replicationManager = new ReplicationManager(nodeId, replicationProperties, replicaResourcesManager,
                     txnSubsystem.getLogManager(), asterixAppRuntimeContextProvider);
@@ -229,13 +231,13 @@ public class NCAppRuntimeContext implements INcApplicationContext {
             //LogManager to replicate logs
             txnSubsystem.getLogManager().setReplicationManager(replicationManager);
 
-            //PersistentLocalResourceRepository to replicate metadata files and delete backups on drop index
+            //PersistentLocalResourceRepository to replicated metadata files and delete backups on drop index
             localResourceRepository.setReplicationManager(replicationManager);
 
             /*
              * add the partitions that will be replicated in this node as inactive partitions
              */
-            //get nodes which replicate to this node
+            //get nodes which replicated to this node
             Set<String> remotePrimaryReplicas = replicationProperties.getRemotePrimaryReplicasIds(nodeId);
             for (String clientId : remotePrimaryReplicas) {
                 //get the partitions of each client
@@ -529,4 +531,9 @@ public class NCAppRuntimeContext implements INcApplicationContext {
     public IStorageSubsystem getStorageSubsystem() {
         return storageSubsystem;
     }
+
+    @Override
+    public IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() {
+        return indexCheckpointManagerProvider;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index e29e3fe..f0ed5e9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -43,11 +43,14 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.transactions.Checkpoint;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ICheckpointManager;
@@ -293,6 +296,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
 
         IAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
         IDatasetLifecycleManager datasetLifecycleManager = appRuntimeContext.getDatasetLifecycleManager();
+        final IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
+                ((INcApplicationContext) (serviceCtx.getApplicationContext())).getIndexCheckpointManagerProvider();
 
         Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources();
         Map<Long, Long> resourceId2MaxLSNMap = new HashMap<>();
@@ -356,18 +361,15 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                                     index = (ILSMIndex) localResourceMetadata.createInstance(serviceCtx);
                                     datasetLifecycleManager.register(localResource.getPath(), index);
                                     datasetLifecycleManager.open(localResource.getPath());
-
-                                    //#. get maxDiskLastLSN
-                                    ILSMIndex lsmIndex = index;
                                     try {
+                                        final DatasetResourceReference resourceReference =
+                                                DatasetResourceReference.of(localResource);
                                         maxDiskLastLsn =
-                                                ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
-                                                        .getComponentLSN(lsmIndex.getDiskComponents());
+                                                indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
                                     } catch (HyracksDataException e) {
                                         datasetLifecycleManager.close(localResource.getPath());
                                         throw e;
                                     }
-
                                     //#. set resourceId and maxDiskLastLSN to the map
                                     resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
                                 } else {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
index fea6cd8..4bfc581 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
@@ -22,6 +22,8 @@ import java.util.List;
 
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -58,7 +60,7 @@ public class TestLsmBtreeIoOpCallbackFactory extends LSMBTreeIOOperationCallback
         // Whenever this is called, it resets the counter
         // However, the counters for the failed operations are never reset since we expect them
         // To be always 0
-        return new TestLsmBtreeIoOpCallback(index, getComponentIdGenerator());
+        return new TestLsmBtreeIoOpCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider());
     }
 
     public int getTotalFlushes() {
@@ -100,8 +102,9 @@ public class TestLsmBtreeIoOpCallbackFactory extends LSMBTreeIOOperationCallback
     public class TestLsmBtreeIoOpCallback extends LSMBTreeIOOperationCallback {
         private final TestLsmBtree lsmBtree;
 
-        public TestLsmBtreeIoOpCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
-            super(index, idGenerator);
+        public TestLsmBtreeIoOpCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator,
+                IIndexCheckpointManagerProvider checkpointManagerProvider) {
+            super(index, idGenerator, checkpointManagerProvider);
             lsmBtree = (TestLsmBtree) index;
         }
 
@@ -121,7 +124,8 @@ public class TestLsmBtreeIoOpCallbackFactory extends LSMBTreeIOOperationCallback
         }
 
         @Override
-        public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) {
+        public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent)
+                throws HyracksDataException {
             lsmBtree.afterIoFinalizeCalled();
             super.afterFinalize(opType, newComponent);
             synchronized (TestLsmBtreeIoOpCallbackFactory.this) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/pom.xml b/asterixdb/asterix-common/pom.xml
index ad83d60..b909e91 100644
--- a/asterixdb/asterix-common/pom.xml
+++ b/asterixdb/asterix-common/pom.xml
@@ -225,18 +225,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-storage-am-btree</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-storage-am-bloomfilter</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-storage-am-rtree</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-api</artifactId>
     </dependency>
     <dependency>
@@ -245,10 +237,6 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-storage-am-lsm-invertedindex</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-storage-am-lsm-rtree</artifactId>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 162e693..0503c09 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -28,7 +28,9 @@ import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.replication.IReplicationChannel;
 import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.IStorageSubsystem;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.INCServiceContext;
@@ -118,4 +120,6 @@ public interface INcApplicationContext extends IApplicationContext {
     INCServiceContext getServiceContext();
 
     IStorageSubsystem getStorageSubsystem();
+
+    IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 9f57981..9ec13ef 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -46,7 +46,7 @@ public class BaseOperationTracker implements ITransactionOperationTracker {
     @Override
     public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
-        if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.REPLICATE) {
+        if (opType == LSMOperationType.REPLICATE) {
             dsInfo.undeclareActiveIOOperation();
         }
     }
@@ -54,14 +54,11 @@ public class BaseOperationTracker implements ITransactionOperationTracker {
     @Override
     public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
-        if (opType == LSMOperationType.MERGE) {
+        if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
             dsInfo.undeclareActiveIOOperation();
         }
     }
 
-    public void exclusiveJobCommitted() throws HyracksDataException {
-    }
-
     @Override
     public void beforeTransaction(long resourceId) {
         /*

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index ce43bca..6a1ebfb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -35,6 +35,8 @@ import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.utils.StoragePathUtil;
@@ -64,13 +66,16 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     private final LogRecord logRecord;
     private final int numPartitions;
     private volatile boolean stopped = false;
+    private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
 
     public DatasetLifecycleManager(StorageProperties storageProperties, ILocalResourceRepository resourceRepository,
-            ILogManager logManager, IDatasetMemoryManager memoryManager, int numPartitions) {
+            ILogManager logManager, IDatasetMemoryManager memoryManager,
+            IIndexCheckpointManagerProvider indexCheckpointManagerProvider, int numPartitions) {
         this.logManager = logManager;
         this.storageProperties = storageProperties;
         this.resourceRepository = resourceRepository;
         this.memoryManager = memoryManager;
+        this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
         this.numPartitions = numPartitions;
         logRecord = new LogRecord();
     }
@@ -149,12 +154,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
         DatasetInfo dsInfo = dsr.getDatasetInfo();
         dsInfo.waitForIO();
-        if (iInfo.isOpen()) {
-            ILSMOperationTracker indexOpTracker = iInfo.getIndex().getOperationTracker();
-            synchronized (indexOpTracker) {
-                iInfo.getIndex().deactivate(false);
-            }
-        }
+        closeIndex(iInfo);
         dsInfo.getIndexes().remove(resourceID);
         if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
                 && !dsInfo.isExternal()) {
@@ -451,13 +451,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
             throw HyracksDataException.create(e);
         }
         for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
-            if (iInfo.isOpen()) {
-                ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
-                synchronized (opTracker) {
-                    iInfo.getIndex().deactivate(false);
-                }
-                iInfo.setOpen(false);
-            }
+            closeIndex(iInfo);
         }
         removeDatasetFromCache(dsInfo.getDatasetID());
         dsInfo.setOpen(false);
@@ -579,4 +573,15 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
             }
         }
     }
+
+    private void closeIndex(IndexInfo indexInfo) throws HyracksDataException {
+        if (indexInfo.isOpen()) {
+            ILSMOperationTracker opTracker = indexInfo.getIndex().getOperationTracker();
+            synchronized (opTracker) {
+                indexInfo.getIndex().deactivate(false);
+            }
+            indexCheckpointManagerProvider.close(DatasetResourceReference.of(indexInfo.getLocalResource()));
+            indexInfo.setOpen(false);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index f6e2b0d..c02de7e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -108,7 +108,7 @@ public class DatasetResource implements Comparable<DatasetResource> {
         if (index == null) {
             throw new HyracksDataException("Attempt to register a null index");
         }
-        datasetInfo.getIndexes().put(resourceID, new IndexInfo(index, datasetInfo.getDatasetID(), resourceID,
+        datasetInfo.getIndexes().put(resourceID, new IndexInfo(index, datasetInfo.getDatasetID(), resource,
                 ((DatasetLocalResource) resource.getResource()).getPartition()));
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
index 9eb5b6c..b094b6f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
@@ -19,17 +19,20 @@
 package org.apache.asterix.common.context;
 
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.LocalResource;
 
 public class IndexInfo extends Info {
     private final ILSMIndex index;
     private final int datasetId;
     private final long resourceId;
     private final int partition;
+    private final LocalResource localResource;
 
-    public IndexInfo(ILSMIndex index, int datasetId, long resourceId, int partition) {
+    public IndexInfo(ILSMIndex index, int datasetId, LocalResource localResource, int partition) {
         this.index = index;
         this.datasetId = datasetId;
-        this.resourceId = resourceId;
+        this.localResource = localResource;
+        this.resourceId = localResource.getId();
         this.partition = partition;
     }
 
@@ -48,4 +51,8 @@ public class IndexInfo extends Info {
     public int getDatasetId() {
         return datasetId;
     }
+
+    public LocalResource getLocalResource() {
+        return localResource;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index ababe9c..14e91ba 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -73,7 +73,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
     public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         // Searches are immediately considered complete, because they should not prevent the execution of flushes.
-        if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.REPLICATE) {
+        if (opType == LSMOperationType.REPLICATE) {
             completeOperation(index, opType, searchCallback, modificationCallback);
         }
     }
@@ -160,12 +160,6 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
         flushLogCreated = false;
     }
 
-    @Override
-    public void exclusiveJobCommitted() throws HyracksDataException {
-        numActiveOperations.set(0);
-        flushIfRequested();
-    }
-
     public int getNumActiveOperations() {
         return numActiveOperations.get();
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
index 04090bb..e844192 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
@@ -41,11 +41,4 @@ public class LSMIndexUtil {
             }
         }
     }
-
-    public static long getComponentFileLSNOffset(ILSMIndex lsmIndex, ILSMDiskComponent lsmComponent,
-            String componentFilePath) throws HyracksDataException {
-        AbstractLSMIOOperationCallback ioOpCallback =
-                (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback();
-        return ioOpCallback.getComponentFileLSNOffset(lsmComponent, componentFilePath);
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index 1432f25..c625988 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -19,8 +19,13 @@
 
 package org.apache.asterix.common.ioopcallbacks;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.primitive.LongPointable;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
@@ -33,6 +38,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperati
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
 import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
 import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
@@ -61,10 +67,14 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC
     protected ILSMComponentId[] nextComponentIds;
 
     protected final ILSMComponentIdGenerator idGenerator;
+    private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
+    private final Map<ILSMComponentId, Long> componentLsnMap = new HashMap<>();
 
-    public AbstractLSMIOOperationCallback(ILSMIndex lsmIndex, ILSMComponentIdGenerator idGenerator) {
+    public AbstractLSMIOOperationCallback(ILSMIndex lsmIndex, ILSMComponentIdGenerator idGenerator,
+            IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
         this.lsmIndex = lsmIndex;
         this.idGenerator = idGenerator;
+        this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
         int count = lsmIndex.getNumberOfAllMemoryComponents();
         mutableLastLSNs = new long[count];
         firstLSNs = new long[count];
@@ -104,42 +114,59 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC
     public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents,
             ILSMDiskComponent newComponent) throws HyracksDataException {
         //TODO: Copying Filters and all content of the metadata pages for flush operation should be done here
-        if (newComponent != null) {
-            putLSNIntoMetadata(newComponent, oldComponents);
-            putComponentIdIntoMetadata(opType, newComponent, oldComponents);
-            if (opType == LSMIOOperationType.MERGE) {
-                // In case of merge, oldComponents are never null
-                LongPointable markerLsn =
-                        LongPointable.FACTORY.createPointable(ComponentUtils.getLong(oldComponents.get(0).getMetadata(),
-                                ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND));
-                newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn);
-            } else if (opType == LSMIOOperationType.FLUSH) {
-                // advance memory component indexes
-                synchronized (this) {
-                    // we've already consumed the specified LSN/component id.
-                    // Now we can advance to the next component
-                    flushRequested[readIndex] = false;
-                    // if the component which just finished flushing is the component that will be modified next,
-                    // we set its first LSN to its previous LSN
-                    if (readIndex == writeIndex) {
-                        firstLSNs[writeIndex] = mutableLastLSNs[writeIndex];
-                    }
-                    readIndex = (readIndex + 1) % mutableLastLSNs.length;
+        if (newComponent == null) {
+            // failed operation. Nothing to do.
+            return;
+        }
+        putLSNIntoMetadata(newComponent, oldComponents);
+        putComponentIdIntoMetadata(opType, newComponent, oldComponents);
+        componentLsnMap.put(newComponent.getId(), getComponentLSN(oldComponents));
+        if (opType == LSMIOOperationType.MERGE) {
+            if (oldComponents == null) {
+                throw new IllegalStateException("Merge must have old components");
+            }
+            LongPointable markerLsn = LongPointable.FACTORY.createPointable(ComponentUtils
+                    .getLong(oldComponents.get(0).getMetadata(), ComponentUtils.MARKER_LSN_KEY,
+                            ComponentUtils.NOT_FOUND));
+            newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn);
+        } else if (opType == LSMIOOperationType.FLUSH) {
+            // advance memory component indexes
+            synchronized (this) {
+                // we've already consumed the specified LSN/component id.
+                // Now we can advance to the next component
+                flushRequested[readIndex] = false;
+                // if the component which just finished flushing is the component that will be modified next,
+                // we set its first LSN to its previous LSN
+                if (readIndex == writeIndex) {
+                    firstLSNs[writeIndex] = mutableLastLSNs[writeIndex];
                 }
+                readIndex = (readIndex + 1) % mutableLastLSNs.length;
             }
         }
     }
 
     @Override
-    public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) {
+    public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) throws HyracksDataException {
         // The operation was complete and the next I/O operation for the LSM index didn't start yet
         if (opType == LSMIOOperationType.FLUSH) {
             hasFlushed = true;
+            if (newComponent != null) {
+                final Long lsn = componentLsnMap.remove(newComponent.getId());
+                if (lsn == null) {
+                    throw new IllegalStateException("Unidentified flushed component: " + newComponent);
+                }
+                // empty component doesn't have any files
+                final Optional<String> componentFile = newComponent.getLSMComponentPhysicalFiles().stream().findAny();
+                if (componentFile.isPresent()) {
+                    final ResourceReference ref = ResourceReference.of(componentFile.get());
+                    final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(ref.getName());
+                    indexCheckpointManagerProvider.get(ref).flushed(componentEndTime, lsn);
+                }
+            }
         }
-
     }
 
-    public void putLSNIntoMetadata(ILSMDiskComponent newComponent, List<ILSMComponent> oldComponents)
+    private void putLSNIntoMetadata(ILSMDiskComponent newComponent, List<ILSMComponent> oldComponents)
             throws HyracksDataException {
         newComponent.getMetadata().put(LSN_KEY, LongPointable.FACTORY.createPointable(getComponentLSN(oldComponents)));
     }
@@ -155,8 +182,8 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC
         if (mergedComponents == null || mergedComponents.isEmpty()) {
             return null;
         }
-        return LSMComponentIdUtils.union(mergedComponents.get(0).getId(),
-                mergedComponents.get(mergedComponents.size() - 1).getId());
+        return LSMComponentIdUtils
+                .union(mergedComponents.get(0).getId(), mergedComponents.get(mergedComponents.size() - 1).getId());
 
     }
 
@@ -186,7 +213,7 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC
         }
     }
 
-    public void setFirstLSN(long firstLSN) {
+    public synchronized void setFirstLSN(long firstLSN) {
         // We make sure that this method is only called on an empty component so the first LSN is not set incorrectly
         firstLSNs[writeIndex] = firstLSN;
     }
@@ -212,8 +239,7 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC
             // Implies a flush IO operation. --> moves the flush pointer
             // Flush operation of an LSM index are executed sequentially.
             synchronized (this) {
-                long lsn = mutableLastLSNs[readIndex];
-                return lsn;
+                return mutableLastLSNs[readIndex];
             }
         }
         // Get max LSN from the diskComponents. Implies a merge IO operation or Recovery operation.
@@ -246,15 +272,4 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC
             component.resetId(componentId);
         }
     }
-
-    /**
-     * @param component
-     * @param componentFilePath
-     * @return The LSN byte offset in the LSM disk component if the index is valid,
-     *         otherwise {@link IMetadataPageManager#INVALID_LSN_OFFSET}.
-     * @throws HyracksDataException
-     */
-    public abstract long getComponentFileLSNOffset(ILSMDiskComponent component, String componentFilePath)
-            throws HyracksDataException;
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
index 5dff7f4..ed56ab1 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
@@ -21,6 +21,8 @@ package org.apache.asterix.common.ioopcallbacks;
 
 import java.io.ObjectStreamException;
 
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
@@ -49,6 +51,10 @@ public abstract class AbstractLSMIndexIOOperationCallbackFactory implements ILSM
         return idGeneratorFactory.getComponentIdGenerator(ncCtx);
     }
 
+    protected IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() {
+        return ((INcApplicationContext) ncCtx.getApplicationContext()).getIndexCheckpointManagerProvider();
+    }
+
     private void readObjectNoData() throws ObjectStreamException {
         idGeneratorFactory = new ILSMComponentIdGeneratorFactory() {
             private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index c1ee03b..db6c609 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -19,28 +19,14 @@
 
 package org.apache.asterix.common.ioopcallbacks;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.btree.impls.BTree;
-import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
-import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeFileManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
 public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
 
-    public LSMBTreeIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
-        super(index, idGenerator);
-    }
-
-    @Override
-    public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent, String diskComponentFilePath)
-            throws HyracksDataException {
-        if (diskComponentFilePath.endsWith(LSMBTreeFileManager.BTREE_SUFFIX)) {
-            IMetadataPageManager metadataPageManager =
-                    (IMetadataPageManager) ((BTree) diskComponent.getIndex()).getPageManager();
-            return metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(), LSN_KEY);
-        }
-        return INVALID;
+    public LSMBTreeIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator,
+            IIndexCheckpointManagerProvider checkpointManagerProvider) {
+        super(index, idGenerator, checkpointManagerProvider);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
index 4ef12ef..95245cb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
@@ -33,6 +33,6 @@ public class LSMBTreeIOOperationCallbackFactory extends AbstractLSMIndexIOOperat
 
     @Override
     public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
-        return new LSMBTreeIOOperationCallback(index, getComponentIdGenerator());
+        return new LSMBTreeIOOperationCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider());
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
index b43fb2f..da1446b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
@@ -18,28 +18,14 @@
  */
 package org.apache.asterix.common.ioopcallbacks;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.btree.impls.BTree;
-import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
-import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyFileManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
 public class LSMBTreeWithBuddyIOOperationCallback extends AbstractLSMIOOperationCallback {
 
-    public LSMBTreeWithBuddyIOOperationCallback(ILSMIndex lsmIndex, ILSMComponentIdGenerator idGenerator) {
-        super(lsmIndex, idGenerator);
-    }
-
-    @Override
-    public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent, String diskComponentFilePath)
-            throws HyracksDataException {
-        if (diskComponentFilePath.endsWith(LSMBTreeWithBuddyFileManager.BTREE_SUFFIX)) {
-            IMetadataPageManager metadataPageManager =
-                    (IMetadataPageManager) ((BTree) diskComponent.getIndex()).getPageManager();
-            return metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(), LSN_KEY);
-        }
-        return INVALID;
+    public LSMBTreeWithBuddyIOOperationCallback(ILSMIndex lsmIndex, ILSMComponentIdGenerator idGenerator,
+            IIndexCheckpointManagerProvider checkpointManagerProvider) {
+        super(lsmIndex, idGenerator, checkpointManagerProvider);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
index 6727bf6..6c75ed6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
@@ -32,6 +32,7 @@ public class LSMBTreeWithBuddyIOOperationCallbackFactory extends AbstractLSMInde
 
     @Override
     public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
-        return new LSMBTreeWithBuddyIOOperationCallback(index, getComponentIdGenerator());
+        return new LSMBTreeWithBuddyIOOperationCallback(index, getComponentIdGenerator(),
+                getIndexCheckpointManagerProvider());
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 015cd38..3ba9bcd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -19,29 +19,14 @@
 
 package org.apache.asterix.common.ioopcallbacks;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent;
-import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexFileManager;
 
 public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationCallback {
 
-    public LSMInvertedIndexIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
-        super(index, idGenerator);
-    }
-
-    @Override
-    public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent, String diskComponentFilePath)
-            throws HyracksDataException {
-        if (diskComponentFilePath.endsWith(LSMInvertedIndexFileManager.DELETED_KEYS_BTREE_SUFFIX)) {
-            LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) diskComponent;
-            IMetadataPageManager metadataPageManager =
-                    (IMetadataPageManager) invIndexComponent.getBuddyIndex().getPageManager();
-            return metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(), LSN_KEY);
-        }
-        return INVALID;
+    public LSMInvertedIndexIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator,
+            IIndexCheckpointManagerProvider checkpointManagerProvider) {
+        super(index, idGenerator, checkpointManagerProvider);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
index a2712d1..fb73d19 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
@@ -33,6 +33,7 @@ public class LSMInvertedIndexIOOperationCallbackFactory extends AbstractLSMIndex
 
     @Override
     public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
-        return new LSMInvertedIndexIOOperationCallback(index, getComponentIdGenerator());
+        return new LSMInvertedIndexIOOperationCallback(index, getComponentIdGenerator(),
+                getIndexCheckpointManagerProvider());
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index bc79074..f3e80ec 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -19,28 +19,14 @@
 
 package org.apache.asterix.common.ioopcallbacks;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.rtree.impls.LSMRTreeFileManager;
-import org.apache.hyracks.storage.am.rtree.impls.RTree;
 
 public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
 
-    public LSMRTreeIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
-        super(index, idGenerator);
-    }
-
-    @Override
-    public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent, String diskComponentFilePath)
-            throws HyracksDataException {
-        if (diskComponentFilePath.endsWith(LSMRTreeFileManager.RTREE_SUFFIX)) {
-            IMetadataPageManager metadataPageManager =
-                    (IMetadataPageManager) ((RTree) diskComponent.getIndex()).getPageManager();
-            return metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(), LSN_KEY);
-        }
-        return INVALID;
+    public LSMRTreeIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator,
+            IIndexCheckpointManagerProvider checkpointManagerProvodier) {
+        super(index, idGenerator, checkpointManagerProvodier);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
index 087aaae..94be0bb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
@@ -33,6 +33,6 @@ public class LSMRTreeIOOperationCallbackFactory extends AbstractLSMIndexIOOperat
 
     @Override
     public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
-        return new LSMRTreeIOOperationCallback(index, getComponentIdGenerator());
+        return new LSMRTreeIOOperationCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider());
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
index d05321e..c488b65 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
@@ -28,6 +28,7 @@ public class DatasetResourceReference extends ResourceReference {
 
     private int datasetId;
     private int partitionId;
+    private long resourceId;
 
     private DatasetResourceReference() {
         super();
@@ -53,6 +54,10 @@ public class DatasetResourceReference extends ResourceReference {
         return partitionId;
     }
 
+    public long getResourceId() {
+        return resourceId;
+    }
+
     private static DatasetResourceReference parse(LocalResource localResource) {
         final DatasetResourceReference datasetResourceReference = new DatasetResourceReference();
         final String filePath = Paths.get(localResource.getPath(), StorageConstants.METADATA_FILE_NAME).toString();
@@ -73,5 +78,28 @@ public class DatasetResourceReference extends ResourceReference {
         final DatasetLocalResource dsResource = (DatasetLocalResource) localResource.getResource();
         lrr.datasetId = dsResource.getDatasetId();
         lrr.partitionId = dsResource.getPartition();
+        lrr.resourceId = localResource.getId();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o != null && o instanceof ResourceReference) {
+            ResourceReference that = (ResourceReference) o;
+            return getRelativePath().toString().equals(that.getRelativePath().toString());
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return getRelativePath().toString().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return getRelativePath().toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
new file mode 100644
index 0000000..afa3823
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IIndexCheckpointManager {
+
+    /**
+     * Initializes the first checkpoint of an index with low watermark {@code lsn}
+     *
+     * @param lsn
+     * @throws HyracksDataException
+     */
+    void init(long lsn) throws HyracksDataException;
+
+    /**
+     * Called when a new LSM disk component is flushed. When called,  the index checkpoiint is updated
+     * with the latest valid {@code componentTimestamp} and low watermark {@code lsn}
+     *
+     * @param componentTimestamp
+     * @param lsn
+     * @throws HyracksDataException
+     */
+    void flushed(String componentTimestamp, long lsn) throws HyracksDataException;
+
+    /**
+     * Called when a new LSM disk component is replicated from master. When called,  the index checkpoiint is updated
+     * with the latest valid {@code componentTimestamp} and the local lsn mapping of {@code masterLsn} is set as the
+     * new low watermark.
+     *
+     * @param componentTimestamp
+     * @param masterLsn
+     * @throws HyracksDataException
+     */
+    void replicated(String componentTimestamp, long masterLsn) throws HyracksDataException;
+
+    /**
+     * Called when a flush log is received and replicated from master. The mapping between
+     * {@code masterLsn} and {@code localLsn} is updated in the checkpoint.
+     *
+     * @param masterLsn
+     * @param localLsn
+     * @throws HyracksDataException
+     */
+    void masterFlush(long masterLsn, long localLsn) throws HyracksDataException;
+
+    /**
+     * The index low watermark
+     *
+     * @return The low watermark
+     * @throws HyracksDataException
+     */
+    long getLowWatermark() throws HyracksDataException;
+
+    /**
+     * True if a mapping exists between {@code masterLsn} and a localLsn. Otherwise false.
+     *
+     * @param masterLsn
+     * @return True if the mapping exists. Otherwise false.
+     * @throws HyracksDataException
+     */
+    boolean isFlushed(long masterLsn) throws HyracksDataException;
+
+    /**
+     * Advance the index low watermark to {@code lsn}
+     *
+     * @param lsn
+     * @throws HyracksDataException
+     */
+    void advanceLowWatermark(long lsn) throws HyracksDataException;
+
+    /**
+     * Deletes all checkpoints
+     */
+    void delete();
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManagerProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManagerProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManagerProvider.java
new file mode 100644
index 0000000..e6cef57
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManagerProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IIndexCheckpointManagerProvider {
+
+    /**
+     * Gets {@link IIndexCheckpointManager} for the index referenced by {@code ref}
+     *
+     * @param ref
+     * @return The index checkpoint manager.
+     * @throws HyracksDataException
+     */
+    IIndexCheckpointManager get(ResourceReference ref) throws HyracksDataException;
+
+    /**
+     * Closes any resources used by the index checkpoint manager referenced by {@code ref}
+     *
+     * @param ref
+     */
+    void close(ResourceReference ref);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
new file mode 100644
index 0000000..6e845e1
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class IndexCheckpoint {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final long INITIAL_CHECKPOINT_ID = 0;
+    private long id;
+    private String validComponentTimestamp;
+    private long lowWatermark;
+    private Map<Long, Long> masterNodeFlushMap;
+
+    public static IndexCheckpoint first(long lowWatermark) {
+        IndexCheckpoint firstCheckpoint = new IndexCheckpoint();
+        firstCheckpoint.id = INITIAL_CHECKPOINT_ID;
+        firstCheckpoint.lowWatermark = lowWatermark;
+        firstCheckpoint.validComponentTimestamp = null;
+        firstCheckpoint.masterNodeFlushMap = new HashMap<>();
+        return firstCheckpoint;
+    }
+
+    public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, String validComponentTimestamp) {
+        if (lowWatermark < latest.getLowWatermark()) {
+            throw new IllegalStateException("Low watermark should always be increasing");
+        }
+        IndexCheckpoint next = new IndexCheckpoint();
+        next.id = latest.getId() + 1;
+        next.lowWatermark = lowWatermark;
+        next.validComponentTimestamp = validComponentTimestamp;
+        next.masterNodeFlushMap = latest.getMasterNodeFlushMap();
+        // remove any lsn from the map that wont be used anymore
+        next.masterNodeFlushMap.values().removeIf(lsn -> lsn <= lowWatermark);
+        return next;
+    }
+
+    @JsonCreator
+    private IndexCheckpoint() {
+    }
+
+    public String getValidComponentTimestamp() {
+        return validComponentTimestamp;
+    }
+
+    public long getLowWatermark() {
+        return lowWatermark;
+    }
+
+    public Map<Long, Long> getMasterNodeFlushMap() {
+        return masterNodeFlushMap;
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    public String asJson() throws HyracksDataException {
+        try {
+            return OBJECT_MAPPER.writeValueAsString(this);
+        } catch (JsonProcessingException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static IndexCheckpoint fromJson(String json) throws HyracksDataException {
+        try {
+            return OBJECT_MAPPER.readValue(json, IndexCheckpoint.class);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
index bd057fa..4aa6982 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -107,4 +107,26 @@ public class ResourceReference {
         ref.root = tokens[--offset];
         ref.rebalance = String.valueOf(0);
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o != null && o instanceof ResourceReference) {
+            ResourceReference that = (ResourceReference) o;
+            return getRelativePath().toString().equals(that.getRelativePath().toString());
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return getRelativePath().toString().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return getRelativePath().toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 6262f71..220b089 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.utils;
 
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 
 /**
  * A static class that stores storage constants
@@ -27,6 +28,12 @@ public class StorageConstants {
 
     public static final String STORAGE_ROOT_DIR_NAME = "storage";
     public static final String PARTITION_DIR_PREFIX = "partition_";
+    /**
+     * Any file that shares the same directory as the LSM index files must
+     * begin with ".". Otherwise {@link AbstractLSMIndexFileManager} will try to
+     * use them as index files.
+     */
+    public static final String INDEX_CHECKPOINT_FILE_PREFIX = ".idx_checkpoint_";
     public static final String METADATA_FILE_NAME = ".metadata";
     public static final String LEGACY_DATASET_INDEX_NAME_SEPARATOR = "_idx_";
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
index 2928d90..3aa7b17 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
@@ -41,6 +41,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
+import org.apache.hyracks.storage.common.LocalResource;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -237,8 +238,8 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase {
 
         Mockito.when(index.createAccessor(Mockito.any(IIndexAccessParameters.class))).thenReturn(accessor);
         Mockito.when(index.isPrimaryIndex()).thenReturn(isPrimary);
-
-        return new IndexInfo(index, DATASET_ID, 0, partition);
+        final LocalResource localResource = Mockito.mock(LocalResource.class);
+        return new IndexInfo(index, DATASET_ID, localResource, partition);
     }
 
 }


Mime
View raw message